script/stream/recorder_tweetstream.py
changeset 261 d84c4aa2a9eb
parent 260 b97a72ab59a2
child 263 6671e9a4c9c5
equal deleted inserted replaced
260:b97a72ab59a2 261:d84c4aa2a9eb
     1 from getpass import getpass
     1 from getpass import getpass
     2 from iri_tweet import models, utils
     2 from iri_tweet import models, utils
     3 from iri_tweet.models import TweetSource, TweetLog
     3 from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
     4 from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
     4 from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
     5     get_logger)
     5     get_logger)
     6 from optparse import OptionParser
     6 from optparse import OptionParser
     7 from sqlalchemy.exc import OperationalError
     7 from sqlalchemy.exc import OperationalError
     8 from sqlalchemy.orm import scoped_session, sessionmaker
     8 from sqlalchemy.orm import scoped_session, sessionmaker
     9 import Queue
     9 import Queue
    10 import StringIO
    10 import StringIO
    11 import anyjson
    11 import anyjson
    12 import datetime
    12 import datetime
       
    13 import inspect
    13 import logging
    14 import logging
    14 import os
    15 import os
    15 import re
    16 import re
    16 import shutil
    17 import shutil
    17 import signal
    18 import signal
   109                 time.sleep(self.retry_wait)
   110                 time.sleep(self.retry_wait)
   110         # Don't listen to auth error, since we can't reasonably reconnect
   111         # Don't listen to auth error, since we can't reasonably reconnect
   111         # when we get one.
   112         # when we get one.
   112 
   113 
   113 
   114 
   114 
   115 class BaseProcess(Process):
   115 
   116 
   116 class SourceProcess(Process):
   117     def __init__(self, parent_pid):
   117     
   118         self.parent_pid = parent_pid
   118     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
   119         super(BaseProcess, self).__init__()
       
   120 
       
   121     #
       
   122     # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
       
   123     #
       
   124     def parent_is_alive(self):
       
   125         try:
       
   126             # try to call Parent
       
   127             os.kill(self.parent_pid, 0)
       
   128         except OSError:
       
   129             # *beeep* oh no! The phone's disconnected!
       
   130             return False
       
   131         else:
       
   132             # *ring* Hi mom!
       
   133             return True
       
   134 
       
   135 
       
   136 class SourceProcess(BaseProcess):
       
   137     
       
   138     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
   119         self.session_maker = session_maker
   139         self.session_maker = session_maker
   120         self.queue = queue
   140         self.queue = queue
   121         self.track = options.track
   141         self.track = options.track
   122         self.reconnects = options.reconnects
   142         self.reconnects = options.reconnects
   123         self.token_filename = options.token_filename
   143         self.token_filename = options.token_filename
   124         self.stop_event = stop_event
   144         self.stop_event = stop_event
   125         self.options = options
   145         self.options = options
   126         self.access_token = access_token
   146         self.access_token = access_token
   127         self.logger_queue = logger_queue
   147         self.logger_queue = logger_queue
   128         super(SourceProcess, self).__init__()
   148         super(SourceProcess, self).__init__(parent_pid)
   129     
   149 
   130     def run(self):
   150     def run(self):
   131         
   151         
   132         #import pydevd
   152         #import pydevd
   133         #pydevd.settrace(suspend=False)
   153         #pydevd.settrace(suspend=False)
   134 
   154 
   146         
   166         
   147         session = self.session_maker()
   167         session = self.session_maker()
   148         
   168         
   149         try:
   169         try:
   150             for tweet in stream:
   170             for tweet in stream:
   151                 self.logger.debug("tweet " + repr(tweet))
   171                 if not self.parent_is_alive():
       
   172                     sys.exit()
       
   173                 self.logger.debug("SourceProcess : tweet " + repr(tweet))
   152                 source = TweetSource(original_json=tweet)
   174                 source = TweetSource(original_json=tweet)
   153                 self.logger.debug("source created")
   175                 self.logger.debug("SourceProcess : source created")
   154                 add_retries = 0
   176                 add_retries = 0
   155                 while add_retries < 10:
   177                 while add_retries < 10:
   156                     try:
   178                     try:
   157                         add_retries += 1
   179                         add_retries += 1
   158                         session.add(source)
   180                         session.add(source)
   159                         session.flush()
   181                         session.flush()
   160                         break
   182                         break
   161                     except OperationalError as e:
   183                     except OperationalError as e:
   162                         session.rollback()
   184                         session.rollback()
   163                         self.logger.debug("Operational Error %s nb %d" % (repr(e), add_retries))
   185                         self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
   164                         if add_retries == 10:
   186                         if add_retries == 10:
   165                             raise e
   187                             raise e
   166                      
   188                      
   167                 source_id = source.id
   189                 source_id = source.id
   168                 self.logger.debug("before queue + source id " + repr(source_id))
   190                 self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
   169                 self.logger.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
   191                 self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
   170                 session.commit()
   192                 session.commit()
   171                 self.queue.put((source_id, tweet), False)
   193                 self.queue.put((source_id, tweet), False)
   172 
   194 
   173         except Exception as e:
   195         except Exception as e:
   174             self.logger.error("Error when processing tweet " + repr(e))
   196             self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
   175         finally:
   197         finally:
   176             session.rollback()
   198             session.rollback()
   177             stream.close()
   199             stream.close()
   178             session.close()
   200             session.close()
   179             self.queue.close()
   201             self.queue.close()
   206         session.add(tweet_log)
   228         session.add(tweet_log)
   207         session.commit()
   229         session.commit()
   208 
   230 
   209     
   231     
   210         
   232         
   211 class TweetProcess(Process):
   233 class TweetProcess(BaseProcess):
   212     
   234     
   213     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
   235     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
   214         self.session_maker = session_maker
   236         self.session_maker = session_maker
   215         self.queue = queue
   237         self.queue = queue
   216         self.stop_event = stop_event
   238         self.stop_event = stop_event
   217         self.options = options
   239         self.options = options
   218         self.access_token = access_token
   240         self.access_token = access_token
   219         self.logger_queue = logger_queue
   241         self.logger_queue = logger_queue
   220         super(TweetProcess, self).__init__()
   242         super(TweetProcess, self).__init__(parent_pid)
   221 
   243 
   222 
   244 
   223     def run(self):
   245     def run(self):
   224         
   246         
   225         self.logger = set_logging_process(self.options, self.logger_queue)
   247         self.logger = set_logging_process(self.options, self.logger_queue)
   226         session = self.session_maker()
   248         session = self.session_maker()
   227         try:
   249         try:
   228             while not self.stop_event.is_set():
   250             while not self.stop_event.is_set() and self.parent_is_alive():
   229                 try:
   251                 try:
   230                     source_id, tweet_txt = queue.get(True, 3)
   252                     source_id, tweet_txt = queue.get(True, 3)
   231                     self.logger.debug("Processing source id " + repr(source_id))
   253                     self.logger.debug("Processing source id " + repr(source_id))
   232                 except Exception as e:
   254                 except Exception as e:
   233                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
   255                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
   271                 continue
   293                 continue
   272         time.sleep(0.1)
   294         time.sleep(0.1)
   273 
   295 
   274         
   296         
   275 def get_options():
   297 def get_options():
   276     parser = OptionParser()
   298 
       
   299     usage = "usage: %prog [options]"
       
   300 
       
   301     parser = OptionParser(usage=usage)
       
   302 
   277     parser.add_option("-f", "--file", dest="conn_str",
   303     parser.add_option("-f", "--file", dest="conn_str",
   278                       help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
   304                       help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
   279     parser.add_option("-u", "--user", dest="username",
   305     parser.add_option("-u", "--user", dest="username",
   280                       help="Twitter user", metavar="USER", default=None)
   306                       help="Twitter user", metavar="USER", default=None)
   281     parser.add_option("-w", "--password", dest="password",
   307     parser.add_option("-w", "--password", dest="password",
   291     parser.add_option("-d", "--duration", dest="duration",
   317     parser.add_option("-d", "--duration", dest="duration",
   292                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
   318                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
   293     parser.add_option("-N", "--nb-process", dest="process_nb",
   319     parser.add_option("-N", "--nb-process", dest="process_nb",
   294                       help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
   320                       help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
   295 
   321 
   296 
       
   297 
       
   298     utils.set_logging_options(parser)
   322     utils.set_logging_options(parser)
   299 
   323 
   300     return parser.parse_args()
   324     return parser.parse_args()
   301     
   325     
       
   326 def add_process_event(type, args, session_maker):
       
   327     session = session_maker()
       
   328     try:
       
   329         evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
       
   330         session.add(evt)
       
   331         session.commit()
       
   332     finally:
       
   333         session.close()
       
   334 
   302 
   335 
   303 if __name__ == '__main__':
   336 if __name__ == '__main__':
   304 
   337 
       
   338     stop_args = {}
   305     (options, args) = get_options()
   339     (options, args) = get_options()
   306     
   340     
   307     set_logging(options)
   341     set_logging(options)
   308         
   342     
   309     if options.debug:
   343     utils.get_logger().debug("OPTIONS : " + repr(options))    
   310         print "OPTIONS : "
       
   311         print repr(options)
       
   312     
       
   313     
   344     
   314     conn_str = options.conn_str.strip()
   345     conn_str = options.conn_str.strip()
   315     if not re.match("^\w+://.+", conn_str):
   346     if not re.match("^\w+://.+", conn_str):
   316         conn_str = 'sqlite:///' + options.conn_str
   347         conn_str = 'sqlite:///' + options.conn_str
   317         
   348         
   318     if conn_str.startswith("sqlite") and options.new:
   349     if conn_str.startswith("sqlite") and options.new:
   319         filepath = conn_str[conn_str.find(":///")+4:]
   350         filepath = conn_str[conn_str.find(":///") + 4:]
   320         if os.path.exists(filepath):
   351         if os.path.exists(filepath):
   321             i = 1
   352             i = 1
   322             basename, extension = os.path.splitext(filepath)
   353             basename, extension = os.path.splitext(filepath)
   323             new_path = '%s.%d%s' % (basename, i, extension)
   354             new_path = '%s.%d%s' % (basename, i, extension)
   324             while i < 1000000 and os.path.exists(new_path):
   355             while i < 1000000 and os.path.exists(new_path):
   338             utils.get_logger().error(message)
   369             utils.get_logger().error(message)
   339             sys.exit(message)
   370             sys.exit(message)
   340     
   371     
   341     metadata.create_all(engine)
   372     metadata.create_all(engine)
   342     
   373     
       
   374     add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
       
   375     
   343     access_token = None
   376     access_token = None
   344     if not options.username or not options.password:
   377     if not options.username or not options.password:
   345         access_token = utils.get_oauth_token(options.token_filename)
   378         access_token = utils.get_oauth_token(options.token_filename)
   346     
   379     
   347     session = Session()
   380     session = Session()
   352         session.rollback()
   385         session.rollback()
   353         session.close()
   386         session.close()
   354     
   387     
   355     if options.process_nb <= 0:
   388     if options.process_nb <= 0:
   356         utils.get_logger().debug("Leftovers processed. Exiting.")
   389         utils.get_logger().debug("Leftovers processed. Exiting.")
       
   390         add_process_event(type="shutdown", args=None, session_maker=Session)
   357         sys.exit()
   391         sys.exit()
   358 
   392 
   359     queue = mQueue()
   393     queue = mQueue()
   360     stop_event = Event()
   394     stop_event = Event()
   361     
   395     
   376     
   410     
   377     SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
   411     SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
   378     process_engines.append(engine_process)
   412     process_engines.append(engine_process)
   379     lqueue = mQueue(1)
   413     lqueue = mQueue(1)
   380     logger_queues.append(lqueue)
   414     logger_queues.append(lqueue)
   381     sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)    
   415     pid = os.getpid()
       
   416     sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
   382     
   417     
   383     tweet_processes = []
   418     tweet_processes = []
   384     
   419     
   385     for i in range(options.process_nb - 1):
   420     for i in range(options.process_nb - 1):
   386         SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
   421         SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
   387         process_engines.append(engine_process)
   422         process_engines.append(engine_process)
   388         lqueue = mQueue(1)
   423         lqueue = mQueue(1)
   389         logger_queues.append(lqueue)
   424         logger_queues.append(lqueue)
   390         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)
   425         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
   391         tweet_processes.append(cprocess)
   426         tweet_processes.append(cprocess)
   392 
   427 
   393     def interupt_handler(signum, frame):
   428     def interupt_handler(signum, frame):
   394         utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(frame))
   429         global stop_args
       
   430         utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
       
   431         stop_args = {'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}
   395         stop_event.set()
   432         stop_event.set()
   396         
   433         
   397     signal.signal(signal.SIGINT , interupt_handler)
   434     signal.signal(signal.SIGINT , interupt_handler)
   398     signal.signal(signal.SIGHUP , interupt_handler)
   435     signal.signal(signal.SIGHUP , interupt_handler)
   399     signal.signal(signal.SIGALRM, interupt_handler)
   436     signal.signal(signal.SIGALRM, interupt_handler)
   400     signal.signal(signal.SIGTERM, interupt_handler)
   437     signal.signal(signal.SIGTERM, interupt_handler)
   401 
   438 
   402     log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues,stop_event,))
   439     log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
   403     log_thread.daemon = True
   440     log_thread.daemon = True
       
   441 
       
   442     log_thread.start()
   404 
   443 
   405     sprocess.start()
   444     sprocess.start()
   406     for cprocess in tweet_processes:
   445     for cprocess in tweet_processes:
   407         cprocess.start()
   446         cprocess.start()
   408 
   447 
   409     log_thread.start()
   448     add_process_event("pid", {"main":os.getpid(), 'source':sprocess.pid, 'consumers':[p.pid for p in tweet_processes]}, Session)
   410 
   449 
   411     if options.duration >= 0:
   450     if options.duration >= 0:
   412         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
   451         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
   413     
   452     
   414 
   453 
   458             session.close()
   497             session.close()
   459 
   498 
   460     for pengine in process_engines:
   499     for pengine in process_engines:
   461         pengine.dispose()
   500         pengine.dispose()
   462         
   501         
       
   502     add_process_event(type="shutdown", args=stop_args, session_maker=Session)
   463     utils.get_logger().debug("Done. Exiting.")
   503     utils.get_logger().debug("Done. Exiting.")
   464         
   504