script/utils/merge_tweets.py
changeset 1497 14a9bed2e3cd
parent 1496 184372ec27e2
equal deleted inserted replaced
1496:184372ec27e2 1497:14a9bed2e3cd
     3 import codecs
     3 import codecs
     4 import json
     4 import json
     5 import logging
     5 import logging
     6 import re
     6 import re
     7 import sys
     7 import sys
       
     8 import twitter
     8 
     9 
     9 from iri_tweet.models import Tweet, TweetLog, TweetSource, setup_database
    10 from iri_tweet.models import Tweet, TweetLog, TweetSource, setup_database
    10 from iri_tweet.processor import TwitterProcessorStatus
    11 from iri_tweet.processor import TwitterProcessorStatus
    11 from iri_tweet.utils import get_oauth_token, show_progress
    12 from iri_tweet.utils import get_oauth_token, show_progress
    12 
    13 
    13 logger = logging.getLogger(__name__)
    14 logger = logging.getLogger(__name__)
    14 
    15 
    15 def get_option():
    16 def get_option():
    16     
    17 
    17     parser = argparse.ArgumentParser(description='Merge tweets databases')
    18     parser = argparse.ArgumentParser(description='Merge tweets databases')
    18 
    19 
    19     parser.add_argument("-l", "--log", dest="logfile",
    20     parser.add_argument("-l", "--log", dest="logfile",
    20                         help="log to file", metavar="LOG", default="stderr")
    21                         help="log to file", metavar="LOG", default="stderr")
    21     parser.add_argument("-v", dest="verbose", action="count",
    22     parser.add_argument("-v", dest="verbose", action="count",
    29     parser.add_argument("--query-user", dest="query_user", action="store_true",
    30     parser.add_argument("--query-user", dest="query_user", action="store_true",
    30                         help="Query twitter for user information",  default=False)
    31                         help="Query twitter for user information",  default=False)
    31     parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
    32     parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
    32                       help="Token file name")
    33                       help="Token file name")
    33 
    34 
    34     
    35 
    35     parser.add_argument("source", action="store", nargs=1, type=str, metavar="SOURCE")
    36     parser.add_argument("source", action="store", nargs=1, type=str, metavar="SOURCE")
    36     parser.add_argument("target", action="store", nargs=1, type=str, metavar="TARGET")
    37     parser.add_argument("target", action="store", nargs=1, type=str, metavar="TARGET")
    37     
    38 
    38 
    39 
    39     return parser.parse_args()
    40     return parser.parse_args()
    40 
    41 
    41 if __name__ == "__main__":
    42 if __name__ == "__main__":
    42     
    43 
    43     #sys.stdout = codecs.getwriter(sys.stdout.encoding)(sys.stdout)
    44     #sys.stdout = codecs.getwriter(sys.stdout.encoding)(sys.stdout)
    44     writer = None
    45     writer = None
    45     options = get_option()
    46     options = get_option()
    46     
    47 
    47     access_token = None
    48     twitter_auth = None
    48     if options.query_user:
    49     if options.query_user:
    49         access_token = get_oauth_token(options.consumer_key, options.consumer_secret, options.token_filename)
    50         acess_token_key, access_token_secret = get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename)
    50     
    51         twitter_auth = twitter.OAuth(acess_token_key, access_token_secret, options.consumer_key, options.consumer_secret)
       
    52 
    51     #open source
    53     #open source
    52     src_conn_str = options.source[0].strip()
    54     src_conn_str = options.source[0].strip()
    53     if not re.match(r"^\w+://.+", src_conn_str):
    55     if not re.match(r"^\w+://.+", src_conn_str):
    54         src_conn_str = 'sqlite:///' + src_conn_str
    56         src_conn_str = 'sqlite:///' + src_conn_str
    55     tgt_conn_str = options.target[0].strip()
    57     tgt_conn_str = options.target[0].strip()
    56     if not re.match(r"^\w+://.+", tgt_conn_str):
    58     if not re.match(r"^\w+://.+", tgt_conn_str):
    57         tgt_conn_str = 'sqlite:///' + tgt_conn_str
    59         tgt_conn_str = 'sqlite:///' + tgt_conn_str
    58 
    60 
    59 
    61 
    60     engine_src, metadata_src, Session_src = setup_database(src_conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)
    62     engine_src, metadata_src, Session_src = setup_database(src_conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)
    61     engine_tgt, metadata_tgt, Session_tgt = setup_database(tgt_conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)        
    63     engine_tgt, metadata_tgt, Session_tgt = setup_database(tgt_conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)
    62 
    64 
    63     conn_src = conn_tgt = session_src = session_tgt = None
    65     conn_src = conn_tgt = session_src = session_tgt = None
    64     
    66 
    65     try:
    67     try:
    66         #conn_src = engine_src.connect()
    68         #conn_src = engine_src.connect()
    67         #conn_tgt = engine_tgt.connect()
    69         #conn_tgt = engine_tgt.connect()
    68         session_src = Session_src()
    70         session_src = Session_src()
    69         session_tgt = Session_tgt()
    71         session_tgt = Session_tgt()
    70                 
    72 
    71         count_tw = session_src.query(Tweet).count()
    73         count_tw = session_src.query(Tweet).count()
    72         
    74 
    73         if count_tw == 0:
    75         if count_tw == 0:
    74             print("No tweet to process : exit")
    76             print("No tweet to process : exit")
    75             sys.exit()
    77             sys.exit()
    76             
    78 
    77         query_src = session_src.query(Tweet).join(TweetSource).yield_per(100)
    79         query_src = session_src.query(Tweet).join(TweetSource).yield_per(100)
    78         added = 0
    80         added = 0
    79         
    81 
    80         for i,tweet in enumerate(query_src):
    82         for i,tweet in enumerate(query_src):
    81             
    83 
    82             tweet_count = session_tgt.query(Tweet).filter(Tweet.id == tweet.id).count()
    84             tweet_count = session_tgt.query(Tweet).filter(Tweet.id == tweet.id).count()
    83             
    85 
    84             progress_text = u"Process: "
    86             progress_text = u"Process: "
    85             if tweet_count == 0:
    87             if tweet_count == 0:
    86                 added += 1
    88                 added += 1
    87                 progress_text = u"Adding : "
    89                 progress_text = u"Adding : "
    88                 tweet_source = tweet.tweet_source.original_json
    90                 tweet_source = tweet.tweet_source.original_json
    89                                 
    91 
    90                 tweet_obj = json.loads(tweet_source)
    92                 tweet_obj = json.loads(tweet_source)
    91                 if 'text' not in tweet_obj:
    93                 if 'text' not in tweet_obj:
    92                     tweet_log = TweetLog(tweet_source_id=tweet.tweet_source.id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
    94                     tweet_log = TweetLog(tweet_source_id=tweet.tweet_source.id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
    93                     session_tgt.add(tweet_log)
    95                     session_tgt.add(tweet_log)
    94                 else:                
    96                 else:
    95                     tp = TwitterProcessorStatus(None, tweet_source, None, session_tgt, access_token, options.token_filename, user_query_twitter=options.query_user, logger=logger)
    97                     tp = TwitterProcessorStatus(None, tweet_source, None, session_tgt, twitter_auth=twitter_auth, user_query_twitter=options.query_user, logger=logger)
    96                     tp.process()
    98                     tp.process()
    97                 
    99 
    98                 session_tgt.flush()
   100                 session_tgt.flush()
    99                 
   101 
   100             ptext = progress_text + tweet.text
   102             ptext = progress_text + tweet.text
   101             writer = show_progress(i+1, count_tw, ptext.replace("\n",""), 70, writer)
   103             writer = show_progress(i+1, count_tw, ptext.replace("\n",""), 70, writer)
   102                             
   104 
   103         session_tgt.commit()
   105         session_tgt.commit()
   104         print(u"%d new tweet added" % (added,))
   106         print(u"%d new tweet added" % (added,))
   105         
   107 
   106     finally:
   108     finally:
   107         if session_tgt is not None:
   109         if session_tgt is not None:
   108             session_tgt.close()
   110             session_tgt.close()
   109         if session_src is not None:
   111         if session_src is not None:
   110             session_src.close()
   112             session_src.close()