script/stream/recorder_tweetstream.py
changeset 263 6671e9a4c9c5
parent 261 d84c4aa2a9eb
child 272 fe2efe3600ea
equal deleted inserted replaced
262:33cf0231e253 263:6671e9a4c9c5
   109 
   109 
   110                 time.sleep(self.retry_wait)
   110                 time.sleep(self.retry_wait)
   111         # Don't listen to auth error, since we can't reasonably reconnect
   111         # Don't listen to auth error, since we can't reasonably reconnect
   112         # when we get one.
   112         # when we get one.
   113 
   113 
       
   114 def add_process_event(type, args, session_maker):
       
   115     session = session_maker()
       
   116     try:
       
   117         evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
       
   118         session.add(evt)
       
   119         session.commit()
       
   120     finally:
       
   121         session.close()
       
   122 
   114 
   123 
   115 class BaseProcess(Process):
   124 class BaseProcess(Process):
   116 
   125 
   117     def __init__(self, parent_pid):
   126     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
   118         self.parent_pid = parent_pid
   127         self.parent_pid = parent_pid
       
   128         self.session_maker = session_maker
       
   129         self.queue = queue
       
   130         self.options = options
       
   131         self.logger_queue = logger_queue
       
   132         self.stop_event = stop_event
       
   133         self.access_token = access_token
       
   134 
   119         super(BaseProcess, self).__init__()
   135         super(BaseProcess, self).__init__()
   120 
   136 
   121     #
   137     #
   122     # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
   138     # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
   123     #
   139     #
   129             # *beeep* oh no! The phone's disconnected!
   145             # *beeep* oh no! The phone's disconnected!
   130             return False
   146             return False
   131         else:
   147         else:
   132             # *ring* Hi mom!
   148             # *ring* Hi mom!
   133             return True
   149             return True
       
   150     
       
   151 
       
   152     def __get_process_event_args(self):
       
   153         return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
       
   154 
       
   155     def run(self):
       
   156         try:
       
   157             add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
       
   158             self.do_run()
       
   159         finally:
       
   160             add_process_event("stop_worker",self.__get_process_event_args(), self.session_maker)
       
   161         
       
   162     def do_run(self):
       
   163         raise NotImplementedError()
       
   164 
   134 
   165 
   135 
   166 
   136 class SourceProcess(BaseProcess):
   167 class SourceProcess(BaseProcess):
   137     
   168     
   138     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
   169     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
   139         self.session_maker = session_maker
       
   140         self.queue = queue
       
   141         self.track = options.track
   170         self.track = options.track
   142         self.reconnects = options.reconnects
   171         self.reconnects = options.reconnects
   143         self.token_filename = options.token_filename
   172         self.token_filename = options.token_filename
   144         self.stop_event = stop_event
   173         super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
   145         self.options = options
   174 
   146         self.access_token = access_token
   175     def do_run(self):
   147         self.logger_queue = logger_queue
       
   148         super(SourceProcess, self).__init__(parent_pid)
       
   149 
       
   150     def run(self):
       
   151         
   176         
   152         #import pydevd
   177         #import pydevd
   153         #pydevd.settrace(suspend=False)
   178         #pydevd.settrace(suspend=False)
   154 
   179 
   155         self.logger = set_logging_process(self.options, self.logger_queue)
   180         self.logger = set_logging_process(self.options, self.logger_queue)
   216         logger.debug(u"Process_tweet :" + repr(tweet))
   241         logger.debug(u"Process_tweet :" + repr(tweet))
   217         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None)
   242         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None)
   218         processor.process()
   243         processor.process()
   219     except Exception as e:
   244     except Exception as e:
   220         message = u"Error %s processing tweet %s" % (repr(e), tweet)
   245         message = u"Error %s processing tweet %s" % (repr(e), tweet)
   221         logger.error(message)
   246         logger.exception(message)
   222         output = StringIO.StringIO()
   247         output = StringIO.StringIO()
   223         traceback.print_exception(Exception, e, None, None, output)
   248         try:
   224         error_stack = output.getvalue()
   249             traceback.print_exc(file=output)
   225         output.close()
   250             error_stack = output.getvalue()
       
   251         finally:
       
   252             output.close()
   226         session.rollback()
   253         session.rollback()
   227         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
   254         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
   228         session.add(tweet_log)
   255         session.add(tweet_log)
   229         session.commit()
   256         session.commit()
   230 
   257 
   231     
   258     
   232         
   259         
   233 class TweetProcess(BaseProcess):
   260 class TweetProcess(BaseProcess):
   234     
   261     
   235     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
   262     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
   236         self.session_maker = session_maker
   263         super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
   237         self.queue = queue
   264 
   238         self.stop_event = stop_event
   265 
   239         self.options = options
   266     def do_run(self):
   240         self.access_token = access_token
       
   241         self.logger_queue = logger_queue
       
   242         super(TweetProcess, self).__init__(parent_pid)
       
   243 
       
   244 
       
   245     def run(self):
       
   246         
   267         
   247         self.logger = set_logging_process(self.options, self.logger_queue)
   268         self.logger = set_logging_process(self.options, self.logger_queue)
   248         session = self.session_maker()
   269         session = self.session_maker()
   249         try:
   270         try:
   250             while not self.stop_event.is_set() and self.parent_is_alive():
   271             while not self.stop_event.is_set() and self.parent_is_alive():
   251                 try:
   272                 try:
   252                     source_id, tweet_txt = queue.get(True, 3)
   273                     source_id, tweet_txt = self.queue.get(True, 3)
   253                     self.logger.debug("Processing source id " + repr(source_id))
   274                     self.logger.debug("Processing source id " + repr(source_id))
   254                 except Exception as e:
   275                 except Exception as e:
   255                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
   276                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
   256                     continue
   277                     continue
   257                 process_tweet(tweet_txt, source_id, session, self.access_token, self.logger)
   278                 process_tweet(tweet_txt, source_id, session, self.access_token, self.logger)
   320                       help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
   341                       help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
   321 
   342 
   322     utils.set_logging_options(parser)
   343     utils.set_logging_options(parser)
   323 
   344 
   324     return parser.parse_args()
   345     return parser.parse_args()
   325     
   346 
   326 def add_process_event(type, args, session_maker):
   347 
       
   348 def do_run(options, session_maker):
       
   349 
       
   350     stop_args = {}
       
   351 
       
   352     access_token = None
       
   353     if not options.username or not options.password:
       
   354         access_token = utils.get_oauth_token(options.token_filename)
       
   355     
   327     session = session_maker()
   356     session = session_maker()
   328     try:
   357     try:
   329         evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
   358         process_leftovers(session, access_token, utils.get_logger())
   330         session.add(evt)
       
   331         session.commit()
   359         session.commit()
   332     finally:
   360     finally:
       
   361         session.rollback()
   333         session.close()
   362         session.close()
       
   363     
       
   364     if options.process_nb <= 0:
       
   365         utils.get_logger().debug("Leftovers processed. Exiting.")
       
   366         return None
       
   367 
       
   368     queue = mQueue()
       
   369     stop_event = Event()
       
   370     
       
   371     #workaround for bug on using urllib2 and multiprocessing
       
   372     req = urllib2.Request('http://localhost')
       
   373     conn = None
       
   374     try:
       
   375         conn = urllib2.urlopen(req)
       
   376     except:
       
   377         utils.get_logger().debug("could not open localhost")
       
   378         #donothing
       
   379     finally:
       
   380         if conn is not None:
       
   381             conn.close()
       
   382     
       
   383     process_engines = []
       
   384     logger_queues = []
       
   385     
       
   386     SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
       
   387     process_engines.append(engine_process)
       
   388     lqueue = mQueue(1)
       
   389     logger_queues.append(lqueue)
       
   390     pid = os.getpid()
       
   391     sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
       
   392     
       
   393     tweet_processes = []
       
   394     
       
   395     for i in range(options.process_nb - 1):
       
   396         SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
       
   397         process_engines.append(engine_process)
       
   398         lqueue = mQueue(1)
       
   399         logger_queues.append(lqueue)
       
   400         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
       
   401         tweet_processes.append(cprocess)
       
   402 
       
   403     def interupt_handler(signum, frame):
       
   404         utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
       
   405         stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
       
   406         stop_event.set()
       
   407         
       
   408     signal.signal(signal.SIGINT , interupt_handler)
       
   409     signal.signal(signal.SIGHUP , interupt_handler)
       
   410     signal.signal(signal.SIGALRM, interupt_handler)
       
   411     signal.signal(signal.SIGTERM, interupt_handler)
       
   412 
       
   413     log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
       
   414     log_thread.daemon = True
       
   415 
       
   416     log_thread.start()
       
   417 
       
   418     sprocess.start()
       
   419     for cprocess in tweet_processes:
       
   420         cprocess.start()
       
   421 
       
   422     add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name,sprocess.pid), 'consumers':dict([(p.name,p.pid) for p in tweet_processes])}, session_maker)
       
   423 
       
   424     if options.duration >= 0:
       
   425         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
       
   426     
       
   427 
       
   428     while not stop_event.is_set():
       
   429         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
       
   430             stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
       
   431             stop_event.set()
       
   432             break
       
   433         if sprocess.is_alive():            
       
   434             time.sleep(1)
       
   435         else:
       
   436             stop_args.update({'message': 'Source process killed'})
       
   437             stop_event.set()
       
   438             break
       
   439     utils.get_logger().debug("Joining Source Process")
       
   440     try:
       
   441         sprocess.join(10)
       
   442     except:
       
   443         utils.get_logger().debug("Pb joining Source Process - terminating")
       
   444         sprocess.terminate()
       
   445         
       
   446     for i, cprocess in enumerate(tweet_processes):
       
   447         utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
       
   448         try:
       
   449             cprocess.join(3)
       
   450         except:
       
   451             utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
       
   452             cprocess.terminate()
       
   453 
       
   454     
       
   455     utils.get_logger().debug("Close queues")
       
   456     try:
       
   457         queue.close()
       
   458         for lqueue in logger_queues:
       
   459             lqueue.close()
       
   460     except exception as e:
       
   461         utils.get_logger().error("error when closing queues %s", repr(e))
       
   462         #do nothing
       
   463         
       
   464     
       
   465     if options.process_nb > 1:
       
   466         utils.get_logger().debug("Processing leftovers")
       
   467         session = session_maker()
       
   468         try:
       
   469             process_leftovers(session, access_token, utils.get_logger())
       
   470             session.commit()
       
   471         finally:
       
   472             session.rollback()
       
   473             session.close()
       
   474 
       
   475     for pengine in process_engines:
       
   476         pengine.dispose()
       
   477 
       
   478     return stop_args
   334 
   479 
   335 
   480 
   336 if __name__ == '__main__':
   481 if __name__ == '__main__':
   337 
   482 
   338     stop_args = {}
       
   339     (options, args) = get_options()
   483     (options, args) = get_options()
   340     
   484     
   341     set_logging(options)
   485     set_logging(options)
   342     
   486     
   343     utils.get_logger().debug("OPTIONS : " + repr(options))    
   487     utils.get_logger().debug("OPTIONS : " + repr(options))    
   368             message = "Database %s not empty exiting" % conn_str
   512             message = "Database %s not empty exiting" % conn_str
   369             utils.get_logger().error(message)
   513             utils.get_logger().error(message)
   370             sys.exit(message)
   514             sys.exit(message)
   371     
   515     
   372     metadata.create_all(engine)
   516     metadata.create_all(engine)
   373     
   517     stop_args = {}
   374     add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
   518     try:
   375     
   519         add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
   376     access_token = None
   520         stop_args = do_run(options, Session)
   377     if not options.username or not options.password:
   521     except Exception as e:
   378         access_token = utils.get_oauth_token(options.token_filename)
   522         utils.get_logger().exception("Error in main thread")        
   379     
   523         outfile = StringIO.StringIO()
   380     session = Session()
   524         try:
   381     try:
   525             traceback.print_exc(file=outfile)
   382         process_leftovers(session, access_token, utils.get_logger())
   526             stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
   383         session.commit()
       
   384     finally:
       
   385         session.rollback()
       
   386         session.close()
       
   387     
       
   388     if options.process_nb <= 0:
       
   389         utils.get_logger().debug("Leftovers processed. Exiting.")
       
   390         add_process_event(type="shutdown", args=None, session_maker=Session)
       
   391         sys.exit()
       
   392 
       
   393     queue = mQueue()
       
   394     stop_event = Event()
       
   395     
       
   396     #workaround for bug on using urllib2 and multiprocessing
       
   397     req = urllib2.Request('http://localhost')
       
   398     conn = None
       
   399     try:
       
   400         conn = urllib2.urlopen(req)
       
   401     except:
       
   402         pass
       
   403         #donothing
       
   404     finally:
       
   405         if conn is not None:
       
   406             conn.close()
       
   407     
       
   408     process_engines = []
       
   409     logger_queues = []
       
   410     
       
   411     SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
       
   412     process_engines.append(engine_process)
       
   413     lqueue = mQueue(1)
       
   414     logger_queues.append(lqueue)
       
   415     pid = os.getpid()
       
   416     sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
       
   417     
       
   418     tweet_processes = []
       
   419     
       
   420     for i in range(options.process_nb - 1):
       
   421         SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
       
   422         process_engines.append(engine_process)
       
   423         lqueue = mQueue(1)
       
   424         logger_queues.append(lqueue)
       
   425         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
       
   426         tweet_processes.append(cprocess)
       
   427 
       
   428     def interupt_handler(signum, frame):
       
   429         global stop_args
       
   430         utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
       
   431         stop_args = {'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}
       
   432         stop_event.set()
       
   433         
       
   434     signal.signal(signal.SIGINT , interupt_handler)
       
   435     signal.signal(signal.SIGHUP , interupt_handler)
       
   436     signal.signal(signal.SIGALRM, interupt_handler)
       
   437     signal.signal(signal.SIGTERM, interupt_handler)
       
   438 
       
   439     log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
       
   440     log_thread.daemon = True
       
   441 
       
   442     log_thread.start()
       
   443 
       
   444     sprocess.start()
       
   445     for cprocess in tweet_processes:
       
   446         cprocess.start()
       
   447 
       
   448     add_process_event("pid", {"main":os.getpid(), 'source':sprocess.pid, 'consumers':[p.pid for p in tweet_processes]}, Session)
       
   449 
       
   450     if options.duration >= 0:
       
   451         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
       
   452     
       
   453 
       
   454     while not stop_event.is_set():
       
   455         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
       
   456             stop_event.set()
       
   457             break
       
   458         if sprocess.is_alive():            
       
   459             time.sleep(1)
       
   460         else:
       
   461             stop_event.set()
       
   462             break
       
   463     utils.get_logger().debug("Joining Source Process")
       
   464     try:
       
   465         sprocess.join(10)
       
   466     except:
       
   467         utils.get_logger().debug("Pb joining Source Process - terminating")
       
   468         sprocess.terminate()
       
   469         
       
   470     for i, cprocess in enumerate(tweet_processes):
       
   471         utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
       
   472         try:
       
   473             cprocess.join(3)
       
   474         except:
       
   475             utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
       
   476             cprocess.terminate()
       
   477 
       
   478     
       
   479     utils.get_logger().debug("Close queues")
       
   480     try:
       
   481         queue.close()
       
   482         for lqueue in logger_queues:
       
   483             lqueue.close()
       
   484     except exception as e:
       
   485         utils.get_logger().error("error when closing queues %s", repr(e))
       
   486         #do nothing
       
   487         
       
   488     
       
   489     if options.process_nb > 1:
       
   490         utils.get_logger().debug("Processing leftovers")
       
   491         session = Session()
       
   492         try:
       
   493             process_leftovers(session, access_token, utils.get_logger())
       
   494             session.commit()
       
   495         finally:
   527         finally:
   496             session.rollback()
   528             outfile.close()
   497             session.close()
   529         raise
   498 
   530     finally:    
   499     for pengine in process_engines:
   531         add_process_event(type="shutdown", args=stop_args, session_maker=Session)
   500         pengine.dispose()
   532 
   501         
       
   502     add_process_event(type="shutdown", args=stop_args, session_maker=Session)
       
   503     utils.get_logger().debug("Done. Exiting.")
   533     utils.get_logger().debug("Done. Exiting.")
   504         
   534