script/utils/merge_tweets.py
author Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Mon, 21 Oct 2013 12:38:23 +0200
changeset 957 e4d0094f097b
parent 888 6fc6637d8403
child 1496 184372ec27e2
permissions -rw-r--r--
upgrade virtualenv + script

#from models import setup_database
from iri_tweet.models import setup_database, TweetSource, Tweet, TweetLog
from iri_tweet.processor import TwitterProcessorStatus
from iri_tweet.utils import get_oauth_token, show_progress
import anyjson
import argparse
import codecs
import logging
import re
import sys

logger = logging.getLogger(__name__)

def get_option():
    
    parser = argparse.ArgumentParser(description='Merge tweets databases')

    parser.add_argument("-l", "--log", dest="logfile",
                        help="log to file", metavar="LOG", default="stderr")
    parser.add_argument("-v", dest="verbose", action="count",
                        help="verbose", default=0)
    parser.add_argument("-k", "--key", dest="consumer_key",
                      help="Twitter consumer key", metavar="CONSUMER_KEY")
    parser.add_argument("-s", "--secret", dest="consumer_secret",
                      help="Twitter consumer secret", metavar="CONSUMER_SECRET")
    parser.add_argument("-q", dest="quiet", action="count",
                        help="quiet", default=0)
    parser.add_argument("--query-user", dest="query_user", action="store_true",
                        help="Query twitter for user information",  default=False)
    parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
                      help="Token file name")

    
    parser.add_argument("source", action="store", nargs=1, type=str, metavar="SOURCE")
    parser.add_argument("target", action="store", nargs=1, type=str, metavar="TARGET")
    

    return parser.parse_args()

if __name__ == "__main__":
    
    #sys.stdout = codecs.getwriter(sys.stdout.encoding)(sys.stdout)
    writer = None
    options = get_option()
    
    access_token = None
    if options.query_user:
        access_token = get_oauth_token(options.consumer_key, options.consumer_secret, options.token_filename)
    
    #open source
    src_conn_str = options.source[0].strip()
    if not re.match("^\w+://.+", src_conn_str):
        src_conn_str = 'sqlite:///' + src_conn_str
    tgt_conn_str = options.target[0].strip()
    if not re.match("^\w+://.+", tgt_conn_str):
        tgt_conn_str = 'sqlite:///' + tgt_conn_str


    engine_src, metadata_src, Session_src = setup_database(src_conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)
    engine_tgt, metadata_tgt, Session_tgt = setup_database(tgt_conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)        

    conn_src = conn_tgt = session_src = session_tgt = None
    
    try:
        #conn_src = engine_src.connect()
        #conn_tgt = engine_tgt.connect()
        session_src = Session_src()
        session_tgt = Session_tgt()
        
        count_tw_query = Tweet.__table__.count()  # @UndefinedVariable
        
        count_tw = engine_src.scalar(count_tw_query)
        
        if count_tw == 0:
            print "No tweet to process : exit"
            sys.exit()
            
        query_src = session_src.query(Tweet).join(TweetSource).yield_per(100)
        added = 0
        
        for i,tweet in enumerate(query_src):
            
            tweet_count = session_tgt.query(Tweet).filter(Tweet.id == tweet.id).count()
            
            progress_text = u"Process: "
            if tweet_count == 0:
                added += 1
                progress_text = u"Adding : "
                tweet_source = tweet.tweet_source.original_json
                                
                tweet_obj = anyjson.deserialize(tweet_source)
                if 'text' not in tweet_obj:
                    tweet_log = TweetLog(tweet_source_id=tweet.tweet_source.id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
                    session_tgt.add(tweet_log)
                else:                
                    tp = TwitterProcessorStatus(None, tweet_source, None, session_tgt, access_token, options.token_filename, user_query_twitter=options.query_user, logger=logger)
                    tp.process()
                
                session_tgt.flush()
                
            ptext = progress_text + tweet.text
            writer = show_progress(i+1, count_tw, ptext.replace("\n",""), 70, writer)
                            
        session_tgt.commit()
        print u"%d new tweet added" % (added)
        
    finally:
        if session_tgt is not None:
            session_tgt.close()
        if session_src is not None:
            session_src.close()
        if conn_tgt is not None:
            conn_tgt.close()
        if conn_src is not None:
            conn_src.close()