script/stream/recorder_stream.py
changeset 890 9c57883dbb9d
parent 888 6fc6637d8403
child 893 10a19dd4e1c9
equal deleted inserted replaced
889:c774bdf7d3dd 890:9c57883dbb9d
   195             self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
   195             self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
   196             raise
   196             raise
   197         finally:
   197         finally:
   198             session.rollback()
   198             session.rollback()
   199             session.close()
   199             session.close()
   200             self.logger_queue.close()
       
   201             self.queue.close()
       
   202             self.stream.close()
   200             self.stream.close()
   203             self.stream = None
   201             self.stream = None
   204             if not self.stop_event.is_set():
   202             if not self.stop_event.is_set():
   205                 self.stop_event.set()
   203                 self.stop_event.set()
   206 
   204 
   223 
   221 
   224         if self.stop_event.is_set() and self.stream:
   222         if self.stop_event.is_set() and self.stream:
   225             self.stream.close()
   223             self.stream.close()
   226         elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
   224         elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
   227             self.stop_event.set()
   225             self.stop_event.set()
   228     
   226 
       
   227         self.queue.cancel_join_thread()
       
   228         self.logger_queue.cancel_join_thread()
   229         self.logger.info("SourceProcess : join")
   229         self.logger.info("SourceProcess : join")
   230         source_stream_iter_thread.join(30)
   230         source_stream_iter_thread.join(30)
   231 
   231 
   232 
   232 
   233 def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger):
   233 def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger):
   328         tweet_txt = src.original_json
   328         tweet_txt = src.original_json
   329         process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger)
   329         process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger)
   330         session.commit()
   330         session.commit()
   331         
   331         
   332     
   332     
   333     # get tweet source that do not match any message
       
   334     # select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
       
   335 def process_log(logger_queues, stop_event):
   333 def process_log(logger_queues, stop_event):
   336     while not stop_event.is_set():
   334     while not stop_event.is_set():
   337         for lqueue in logger_queues:
   335         for lqueue in logger_queues:
   338             try:
   336             try:
   339                 record = lqueue.get_nowait()
   337                 record = lqueue.get_nowait()
   482     utils.get_logger().debug("Joining Source Process")
   480     utils.get_logger().debug("Joining Source Process")
   483     try:
   481     try:
   484         sprocess.join(10)
   482         sprocess.join(10)
   485     except:
   483     except:
   486         utils.get_logger().debug("Pb joining Source Process - terminating")
   484         utils.get_logger().debug("Pb joining Source Process - terminating")
       
   485     finally:
   487         sprocess.terminate()
   486         sprocess.terminate()
   488         
   487         
   489     for i, cprocess in enumerate(tweet_processes):
   488     for i, cprocess in enumerate(tweet_processes):
   490         utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
   489         utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
   491         try:
   490         try:
   493         except:
   492         except:
   494             utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
   493             utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
   495             cprocess.terminate()
   494             cprocess.terminate()
   496 
   495 
   497     
   496     
   498     utils.get_logger().debug("Close queues")        
   497     utils.get_logger().debug("Close queues")
   499     
       
   500     if options.process_nb > 1:
       
   501         utils.get_logger().debug("Processing leftovers")
       
   502         session = session_maker()
       
   503         try:
       
   504             process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
       
   505             session.commit()
       
   506         finally:
       
   507             session.rollback()
       
   508             session.close()
       
   509 
       
   510     for pengine in process_engines:
       
   511         pengine.dispose()
       
   512     
       
   513     try:
   498     try:
   514         queue.close()
   499         queue.close()
   515         for lqueue in logger_queues:
   500         for lqueue in logger_queues:
   516             lqueue.close()
   501             lqueue.close()
   517     except Exception as e:
   502     except Exception as e:
   518         utils.get_logger().error("error when closing queues %s", repr(e))
   503         utils.get_logger().error("error when closing queues %s", repr(e))
   519         # do nothing
   504         # do nothing
   520 
   505         
       
   506     
       
   507     if options.process_nb > 1:
       
   508         utils.get_logger().debug("Processing leftovers")
       
   509         session = session_maker()
       
   510         try:
       
   511             process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
       
   512             session.commit()
       
   513         finally:
       
   514             session.rollback()
       
   515             session.close()
       
   516 
       
   517     for pengine in process_engines:
       
   518         pengine.dispose()
       
   519     
   521     return stop_args
   520     return stop_args
   522 
   521 
   523 
   522 
   524 def main(options):
   523 def main(options):
   525     
   524