script/stream/recorder_tweetstream.py
changeset 255 500cd0405c7a
parent 254 2209e66bb50b
child 256 2f335337ff64
equal deleted inserted replaced
254:2209e66bb50b 255:500cd0405c7a
     1 from getpass import getpass
     1 from getpass import getpass
     2 from iri_tweet import models, utils
     2 from iri_tweet import models, utils
     3 from iri_tweet.models import TweetSource, TweetLog
     3 from iri_tweet.models import TweetSource, TweetLog
     4 from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
     4 from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
       
     5     get_logger)
     5 from optparse import OptionParser
     6 from optparse import OptionParser
     6 from sqlalchemy.exc import OperationalError
     7 from sqlalchemy.exc import OperationalError
     7 from sqlalchemy.orm import scoped_session, sessionmaker
     8 from sqlalchemy.orm import scoped_session, sessionmaker
       
     9 import Queue
     8 import StringIO
    10 import StringIO
     9 import anyjson
    11 import anyjson
    10 import datetime
    12 import datetime
    11 import logging
    13 import logging
    12 import os
    14 import os
    14 import shutil
    16 import shutil
    15 import signal
    17 import signal
    16 import socket
    18 import socket
    17 import sqlalchemy.schema
    19 import sqlalchemy.schema
    18 import sys
    20 import sys
       
    21 import threading
    19 import time
    22 import time
    20 import traceback
    23 import traceback
    21 import tweepy.auth
    24 import tweepy.auth
    22 import tweetstream
    25 import tweetstream
    23 import urllib2
    26 import urllib2
    32 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
    35 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
    33 #just put it in a sqlite3 tqble
    36 #just put it in a sqlite3 tqble
    34 
    37 
    35 
    38 
    36 def set_logging(options):
    39 def set_logging(options):
    37     utils.set_logging(options, logging.getLogger('iri_tweet'))
    40     utils.set_logging(options, logging.getLogger('iri.tweet'))
    38     utils.set_logging(options, logging.getLogger('multiprocessing'))
    41     utils.set_logging(options, logging.getLogger('multiprocessing'))
    39     if options.debug >= 2:
    42     if options.debug >= 2:
    40         utils.set_logging(options, logging.getLogger('sqlalchemy.engine'))
    43         utils.set_logging(options, logging.getLogger('sqlalchemy.engine'))
    41     #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
    44     #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
    42     #utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
    45     #utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
    43     #utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
    46     #utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
       
    47 
       
    48 def set_logging_process(options, queue):
       
    49     qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
       
    50     qlogger.propagate = 0
       
    51     return qlogger
    44 
    52 
    45 def get_auth(options, access_token):
    53 def get_auth(options, access_token):
    46     if options.username and options.password:
    54     if options.username and options.password:
    47         auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
    55         auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
    48     else:
    56     else:
   105 
   113 
   106 
   114 
   107 
   115 
   108 class SourceProcess(Process):
   116 class SourceProcess(Process):
   109     
   117     
   110     def __init__(self, session_maker, queue, options, access_token, stop_event):
   118     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
   111         self.session_maker = session_maker
   119         self.session_maker = session_maker
   112         self.queue = queue
   120         self.queue = queue
   113         self.track = options.track
   121         self.track = options.track
   114         self.reconnects = options.reconnects
   122         self.reconnects = options.reconnects
   115         self.token_filename = options.token_filename
   123         self.token_filename = options.token_filename
   116         self.stop_event = stop_event
   124         self.stop_event = stop_event
   117         self.options = options
   125         self.options = options
   118         self.access_token = access_token
   126         self.access_token = access_token
       
   127         self.logger_queue = logger_queue
   119         super(SourceProcess, self).__init__()
   128         super(SourceProcess, self).__init__()
   120     
   129     
   121     def run(self):
   130     def run(self):
       
   131         
   122         #import pydevd
   132         #import pydevd
   123         #pydevd.settrace(suspend=False)
   133         #pydevd.settrace(suspend=False)
   124 
   134 
   125         set_logging(self.options)
   135         self.logger = set_logging_process(self.options, self.logger_queue)
   126         self.auth = get_auth(self.options, self.access_token) 
   136         self.auth = get_auth(self.options, self.access_token) 
   127         
   137         
   128         utils.get_logger().debug("SourceProcess : run")
   138         self.logger.debug("SourceProcess : run")
   129         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
   139         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
   130         track_list = [k for k in track_list.split(',')]
   140         track_list = [k for k in track_list.split(',')]
   131 
   141 
   132         utils.get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))                        
   142         self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
   133         stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
   143         stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
   134         utils.get_logger().debug("SourceProcess : after connecting to stream")
   144         self.logger.debug("SourceProcess : after connecting to stream")
   135         stream.muststop = lambda: self.stop_event.is_set()
   145         stream.muststop = lambda: self.stop_event.is_set()
   136         
   146         
   137         session = self.session_maker()
   147         session = self.session_maker()
   138         
   148         
   139         try:
   149         try:
   140             for tweet in stream:
   150             for tweet in stream:
   141                 utils.get_logger().debug("tweet " + repr(tweet))
   151                 self.logger.debug("tweet " + repr(tweet))
   142                 source = TweetSource(original_json=tweet)
   152                 source = TweetSource(original_json=tweet)
   143                 utils.get_logger().debug("source created")
   153                 self.logger.debug("source created")
   144                 add_retries = 0
   154                 add_retries = 0
   145                 while add_retries < 10:
   155                 while add_retries < 10:
   146                     try:
   156                     try:
   147                         add_retries += 1
   157                         add_retries += 1
   148                         session.add(source)
   158                         session.add(source)
   149                         session.flush()
   159                         session.flush()
   150                         break
   160                         break
   151                     except OperationalError as e:
   161                     except OperationalError as e:
   152                         session.rollback()
   162                         session.rollback()
   153                         utils.get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
   163                         self.logger.debug("Operational Error %s nb %d" % (repr(e), add_retries))
   154                         if add_retries == 10:
   164                         if add_retries == 10:
   155                             raise e
   165                             raise e
   156                      
   166                      
   157                 source_id = source.id
   167                 source_id = source.id
   158                 utils.get_logger().debug("before queue + source id " + repr(source_id))
   168                 self.logger.debug("before queue + source id " + repr(source_id))
   159                 utils.get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
   169                 self.logger.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
   160                 session.commit()
   170                 session.commit()
   161                 self.queue.put((source_id, tweet), False)
   171                 self.queue.put((source_id, tweet), False)
   162 
   172 
   163         except Exception as e:
   173         except Exception as e:
   164             utils.get_logger().error("Error when processing tweet " + repr(e))
   174             self.logger.error("Error when processing tweet " + repr(e))
   165         finally:
   175         finally:
   166             session.rollback()
   176             session.rollback()
   167             stream.close()
   177             stream.close()
   168             session.close()
   178             session.close()
   169             self.queue.close()
   179             self.queue.close()
   170             self.stop_event.set()
   180             self.stop_event.set()
   171 
   181 
   172 
   182 
   173 def process_tweet(tweet, source_id, session, access_token):
   183 def process_tweet(tweet, source_id, session, access_token, logger):
   174     try:
   184     try:
   175         tweet_obj = anyjson.deserialize(tweet)
   185         tweet_obj = anyjson.deserialize(tweet)
       
   186         if 'text' not in tweet_obj:
       
   187             tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
       
   188             session.add(tweet_log)
       
   189             return
   176         screen_name = ""
   190         screen_name = ""
   177         if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
   191         if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
   178             screen_name = tweet_obj['user']['screen_name']
   192             screen_name = tweet_obj['user']['screen_name']
   179         utils.get_logger().debug(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
   193         logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
   180         utils.get_logger().debug(u"Process_tweet :" + repr(tweet))
   194         logger.debug(u"Process_tweet :" + repr(tweet))
   181         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None)
   195         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None)
   182         processor.process()
   196         processor.process()
   183     except Exception as e:
   197     except Exception as e:
   184         message = u"Error %s processing tweet %s" % (repr(e), tweet)
   198         message = u"Error %s processing tweet %s" % (repr(e), tweet)
   185         utils.get_logger().error(message)
   199         logger.error(message)
   186         output = StringIO.StringIO()
   200         output = StringIO.StringIO()
   187         traceback.print_exception(Exception, e, None, None, output)
   201         traceback.print_exception(Exception, e, None, None, output)
   188         error_stack = output.getvalue()
   202         error_stack = output.getvalue()
   189         output.close()
   203         output.close()
   190         session.rollback()
   204         session.rollback()
   194 
   208 
   195     
   209     
   196         
   210         
   197 class TweetProcess(Process):
   211 class TweetProcess(Process):
   198     
   212     
   199     def __init__(self, session_maker, queue, options, access_token, stop_event):
   213     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
   200         self.session_maker = session_maker
   214         self.session_maker = session_maker
   201         self.queue = queue
   215         self.queue = queue
   202         self.stop_event = stop_event
   216         self.stop_event = stop_event
   203         self.options = options
   217         self.options = options
   204         self.access_token = access_token
   218         self.access_token = access_token
       
   219         self.logger_queue = logger_queue
   205         super(TweetProcess, self).__init__()
   220         super(TweetProcess, self).__init__()
   206 
   221 
   207 
   222 
   208     def run(self):
   223     def run(self):
   209         
   224         
   210         set_logging(self.options)
   225         self.logger = set_logging_process(self.options, self.logger_queue)
   211         session = self.session_maker()
   226         session = self.session_maker()
   212         try:
   227         try:
   213             while not self.stop_event.is_set():
   228             while not self.stop_event.is_set():
   214                 try:
   229                 try:
   215                     source_id, tweet_txt = queue.get(True, 3)
   230                     source_id, tweet_txt = queue.get(True, 3)
   216                     utils.get_logger().debug("Processing source id " + repr(source_id))
   231                     self.logger.debug("Processing source id " + repr(source_id))
   217                 except Exception as e:
   232                 except Exception as e:
   218                     utils.get_logger().debug('Process tweet exception in loop : ' + repr(e))
   233                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
   219                     continue
   234                     continue
   220                 process_tweet(tweet_txt, source_id, session, self.access_token)
   235                 process_tweet(tweet_txt, source_id, session, self.access_token, self.logger)
   221                 session.commit()
   236                 session.commit()
   222         finally:
   237         finally:
   223             session.rollback()
   238             session.rollback()
   224             self.stop_event.set()
   239             self.stop_event.set()
   225             session.close()
   240             session.close()
   229     engine, metadata = models.setup_database(conn_str, echo=False, create_all=False)
   244     engine, metadata = models.setup_database(conn_str, echo=False, create_all=False)
   230     Session = scoped_session(sessionmaker(bind=engine, autocommit=False))
   245     Session = scoped_session(sessionmaker(bind=engine, autocommit=False))
   231     return Session, engine, metadata
   246     return Session, engine, metadata
   232 
   247 
   233             
   248             
   234 def process_leftovers(session, access_token):
   249 def process_leftovers(session, access_token, logger):
   235     
   250     
   236     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
   251     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
   237     
   252     
   238     for src in sources:
   253     for src in sources:
   239         tweet_txt = src.original_json
   254         tweet_txt = src.original_json
   240         process_tweet(tweet_txt, src.id, session, access_token)
   255         process_tweet(tweet_txt, src.id, session, access_token, logger)
   241         session.commit()
   256         session.commit()
   242 
   257 
   243         
   258         
   244     
   259     
   245     #get tweet source that do not match any message
   260     #get tweet source that do not match any message
   246     #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
   261     #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
       
   262 def process_log(logger_queues, stop_event):
       
   263     while not stop_event.is_set():
       
   264         for lqueue in logger_queues:
       
   265             try:
       
   266                 record = lqueue.get_nowait()
       
   267                 logging.getLogger(record.name).handle(record)
       
   268             except Queue.Empty:
       
   269                 continue
       
   270         time.sleep(0.1)
   247 
   271 
   248         
   272         
   249 def get_options():
   273 def get_options():
   250     parser = OptionParser()
   274     parser = OptionParser()
   251     parser.add_option("-f", "--file", dest="conn_str",
   275     parser.add_option("-f", "--file", dest="conn_str",
   273 
   297 
   274     return parser.parse_args()
   298     return parser.parse_args()
   275     
   299     
   276 
   300 
   277 if __name__ == '__main__':
   301 if __name__ == '__main__':
   278     
   302 
   279     (options, args) = get_options()
   303     (options, args) = get_options()
   280     
   304     
   281     set_logging(options)
   305     set_logging(options)
   282         
   306         
   283     if options.debug:
   307     if options.debug:
   285         print repr(options)
   309         print repr(options)
   286     
   310     
   287     
   311     
   288     conn_str = options.conn_str.strip()
   312     conn_str = options.conn_str.strip()
   289     if not re.match("^\w+://.+", conn_str):
   313     if not re.match("^\w+://.+", conn_str):
   290         conn_str = 'sqlite://' + options.conn_str
   314         conn_str = 'sqlite:///' + options.conn_str
   291         
   315         
   292     if conn_str.startswith("sqlite") and options.new:
   316     if conn_str.startswith("sqlite") and options.new:
   293         filepath = conn_str[conn_str.find("://"):]
   317         filepath = conn_str[conn_str.find(":///")+4:]
   294         if os.path.exists(filepath):
   318         if os.path.exists(filepath):
   295             i = 1
   319             i = 1
   296             basename, extension = os.path.splitext(filepath)
   320             basename, extension = os.path.splitext(filepath)
   297             new_path = '%s.%d%s' % (basename, i, extension)
   321             new_path = '%s.%d%s' % (basename, i, extension)
   298             while i < 1000000 and os.path.exists(new_path):
   322             while i < 1000000 and os.path.exists(new_path):
   318     if not options.username or not options.password:
   342     if not options.username or not options.password:
   319         access_token = utils.get_oauth_token(options.token_filename)
   343         access_token = utils.get_oauth_token(options.token_filename)
   320     
   344     
   321     session = Session()
   345     session = Session()
   322     try:
   346     try:
   323         process_leftovers(session, access_token)
   347         process_leftovers(session, access_token, utils.get_logger())
   324         session.commit()
   348         session.commit()
   325     finally:
   349     finally:
   326         session.rollback()
   350         session.rollback()
   327         session.close()
   351         session.close()
   328     
   352     
   329     if options.process_nb <= 0:
   353     if options.process_nb <= 0:
   330         utils.get_logger().debug("Leftovers processed. Exiting.")
   354         utils.get_logger().debug("Leftovers processed. Exiting.")
   331         sys.exit()
   355         sys.exit()
   332 
   356 
   333     queue = JoinableQueue()
   357     queue = mQueue()
   334     stop_event = Event()
   358     stop_event = Event()
   335     
   359     
   336     #workaround for bug on using urllib2 and multiprocessing
   360     #workaround for bug on using urllib2 and multiprocessing
   337     req = urllib2.Request('http://localhost')
   361     req = urllib2.Request('http://localhost')
   338     conn = None
   362     conn = None
   342         pass
   366         pass
   343         #donothing
   367         #donothing
   344     finally:
   368     finally:
   345         if conn is not None:
   369         if conn is not None:
   346             conn.close()
   370             conn.close()
   347         
   371     
   348     
   372     process_engines = []
   349     sprocess = SourceProcess(Session, queue, options, access_token, stop_event)    
   373     logger_queues = []
       
   374     
       
   375     SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
       
   376     process_engines.append(engine_process)
       
   377     lqueue = mQueue(1)
       
   378     logger_queues.append(lqueue)
       
   379     sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)    
   350     
   380     
   351     tweet_processes = []
   381     tweet_processes = []
   352     
   382     
   353     for i in range(options.process_nb - 1):
   383     for i in range(options.process_nb - 1):
   354         Session, engine, metadata = get_sessionmaker(conn_str)
   384         SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
   355         cprocess = TweetProcess(Session, queue, options, access_token, stop_event)
   385         process_engines.append(engine_process)
       
   386         lqueue = mQueue(1)
       
   387         logger_queues.append(lqueue)
       
   388         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)
   356         tweet_processes.append(cprocess)
   389         tweet_processes.append(cprocess)
   357 
   390 
   358     def interupt_handler(signum, frame):
   391     def interupt_handler(signum, frame):
   359         stop_event.set()
   392         stop_event.set()
   360         
   393         
   361     signal.signal(signal.SIGINT, interupt_handler)
   394     signal.signal(signal.SIGINT, interupt_handler)
       
   395 
       
   396     log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues,stop_event,))
       
   397     log_thread.daemon = True
   362 
   398 
   363     sprocess.start()
   399     sprocess.start()
   364     for cprocess in tweet_processes:
   400     for cprocess in tweet_processes:
   365         cprocess.start()
   401         cprocess.start()
   366 
   402 
       
   403     log_thread.start()
       
   404 
   367     if options.duration >= 0:
   405     if options.duration >= 0:
   368         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
   406         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
       
   407     
   369 
   408 
   370     while not stop_event.is_set():
   409     while not stop_event.is_set():
   371         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
   410         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
   372             stop_event.set()
   411             stop_event.set()
   373             break
   412             break
   374         if sprocess.is_alive():
   413         if sprocess.is_alive():            
   375             time.sleep(1)
   414             time.sleep(1)
   376         else:
   415         else:
   377             stop_event.set()
   416             stop_event.set()
   378             break
   417             break
   379     
       
   380     utils.get_logger().debug("Joining Source Process")
   418     utils.get_logger().debug("Joining Source Process")
   381     try:
   419     try:
   382         sprocess.join(10)
   420         sprocess.join(10)
   383     except:
   421     except:
   384         utils.get_logger().debug("Pb joining Source Process - terminating")
   422         utils.get_logger().debug("Pb joining Source Process - terminating")
   385         sprocess.terminate()
   423         sprocess.terminate()
   386         
   424         
   387     utils.get_logger().debug("Joining Queue")
       
   388     #queue.join()
       
   389     for i, cprocess in enumerate(tweet_processes):
   425     for i, cprocess in enumerate(tweet_processes):
   390         utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
   426         utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
   391         try:
   427         try:
   392             cprocess.join(3)
   428             cprocess.join(3)
   393         except:
   429         except:
   394             utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
   430             utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
   395             cprocess.terminate()
   431             cprocess.terminate()
   396     
   432 
   397     utils.get_logger().debug("Processing leftovers")
   433     
   398     session = Session()
   434     utils.get_logger().debug("Close queues")
   399     try:
   435     try:
   400         process_leftovers(session, access_token)
   436         queue.close()
   401         session.commit()
   437         for lqueue in logger_queues:
   402     finally:
   438             lqueue.close()
   403         session.rollback()
   439     except exception as e:
   404         session.close()
   440         utils.get_logger().error("error when closing queues %s", repr(e))
   405 
   441         #do nothing
       
   442         
       
   443     
       
   444     if options.process_nb > 1:
       
   445         utils.get_logger().debug("Processing leftovers")
       
   446         session = Session()
       
   447         try:
       
   448             process_leftovers(session, access_token, utils.get_logger())
       
   449             session.commit()
       
   450         finally:
       
   451             session.rollback()
       
   452             session.close()
       
   453 
       
   454     for pengine in process_engines:
       
   455         pengine.dispose()
       
   456         
   406     utils.get_logger().debug("Done. Exiting.")
   457     utils.get_logger().debug("Done. Exiting.")
   407         
   458