script/stream/recorder_tweetstream.py
changeset 263 6671e9a4c9c5
parent 261 d84c4aa2a9eb
child 272 fe2efe3600ea
--- a/script/stream/recorder_tweetstream.py	Thu Aug 25 02:22:19 2011 +0200
+++ b/script/stream/recorder_tweetstream.py	Thu Aug 25 18:23:53 2011 +0200
@@ -111,11 +111,27 @@
         # Don't listen to auth error, since we can't reasonably reconnect
         # when we get one.
 
+def add_process_event(type, args, session_maker):
+    session = session_maker()
+    try:
+        evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
+        session.add(evt)
+        session.commit()
+    finally:
+        session.close()
+
 
 class BaseProcess(Process):
 
-    def __init__(self, parent_pid):
+    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
         self.parent_pid = parent_pid
+        self.session_maker = session_maker
+        self.queue = queue
+        self.options = options
+        self.logger_queue = logger_queue
+        self.stop_event = stop_event
+        self.access_token = access_token
+
         super(BaseProcess, self).__init__()
 
     #
@@ -131,23 +147,32 @@
         else:
             # *ring* Hi mom!
             return True
+    
+
+    def __get_process_event_args(self):
+        return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
+
+    def run(self):
+        try:
+            add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
+            self.do_run()
+        finally:
+            add_process_event("stop_worker",self.__get_process_event_args(), self.session_maker)
+        
+    def do_run(self):
+        raise NotImplementedError()
+
 
 
 class SourceProcess(BaseProcess):
     
     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
-        self.session_maker = session_maker
-        self.queue = queue
         self.track = options.track
         self.reconnects = options.reconnects
         self.token_filename = options.token_filename
-        self.stop_event = stop_event
-        self.options = options
-        self.access_token = access_token
-        self.logger_queue = logger_queue
-        super(SourceProcess, self).__init__(parent_pid)
+        super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
 
-    def run(self):
+    def do_run(self):
         
         #import pydevd
         #pydevd.settrace(suspend=False)
@@ -218,11 +243,13 @@
         processor.process()
     except Exception as e:
         message = u"Error %s processing tweet %s" % (repr(e), tweet)
-        logger.error(message)
+        logger.exception(message)
         output = StringIO.StringIO()
-        traceback.print_exception(Exception, e, None, None, output)
-        error_stack = output.getvalue()
-        output.close()
+        try:
+            traceback.print_exc(file=output)
+            error_stack = output.getvalue()
+        finally:
+            output.close()
         session.rollback()
         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
         session.add(tweet_log)
@@ -233,23 +260,17 @@
 class TweetProcess(BaseProcess):
     
     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
-        self.session_maker = session_maker
-        self.queue = queue
-        self.stop_event = stop_event
-        self.options = options
-        self.access_token = access_token
-        self.logger_queue = logger_queue
-        super(TweetProcess, self).__init__(parent_pid)
+        super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
 
 
-    def run(self):
+    def do_run(self):
         
         self.logger = set_logging_process(self.options, self.logger_queue)
         session = self.session_maker()
         try:
             while not self.stop_event.is_set() and self.parent_is_alive():
                 try:
-                    source_id, tweet_txt = queue.get(True, 3)
+                    source_id, tweet_txt = self.queue.get(True, 3)
                     self.logger.debug("Processing source id " + repr(source_id))
                 except Exception as e:
                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
@@ -322,20 +343,143 @@
     utils.set_logging_options(parser)
 
     return parser.parse_args()
+
+
+def do_run(options, session_maker):
+
+    stop_args = {}
+
+    access_token = None
+    if not options.username or not options.password:
+        access_token = utils.get_oauth_token(options.token_filename)
     
-def add_process_event(type, args, session_maker):
     session = session_maker()
     try:
-        evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
-        session.add(evt)
+        process_leftovers(session, access_token, utils.get_logger())
         session.commit()
     finally:
+        session.rollback()
         session.close()
+    
+    if options.process_nb <= 0:
+        utils.get_logger().debug("Leftovers processed. Exiting.")
+        return None
+
+    queue = mQueue()
+    stop_event = Event()
+    
+    #workaround for bug on using urllib2 and multiprocessing
+    req = urllib2.Request('http://localhost')
+    conn = None
+    try:
+        conn = urllib2.urlopen(req)
+    except:
+        utils.get_logger().debug("could not open localhost")
+        #donothing
+    finally:
+        if conn is not None:
+            conn.close()
+    
+    process_engines = []
+    logger_queues = []
+    
+    SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+    process_engines.append(engine_process)
+    lqueue = mQueue(1)
+    logger_queues.append(lqueue)
+    pid = os.getpid()
+    sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
+    
+    tweet_processes = []
+    
+    for i in range(options.process_nb - 1):
+        SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+        process_engines.append(engine_process)
+        lqueue = mQueue(1)
+        logger_queues.append(lqueue)
+        cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
+        tweet_processes.append(cprocess)
+
+    def interupt_handler(signum, frame):
+        utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
+        stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
+        stop_event.set()
+        
+    signal.signal(signal.SIGINT , interupt_handler)
+    signal.signal(signal.SIGHUP , interupt_handler)
+    signal.signal(signal.SIGALRM, interupt_handler)
+    signal.signal(signal.SIGTERM, interupt_handler)
+
+    log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
+    log_thread.daemon = True
+
+    log_thread.start()
+
+    sprocess.start()
+    for cprocess in tweet_processes:
+        cprocess.start()
+
+    add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name,sprocess.pid), 'consumers':dict([(p.name,p.pid) for p in tweet_processes])}, session_maker)
+
+    if options.duration >= 0:
+        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
+    
+
+    while not stop_event.is_set():
+        if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
+            stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
+            stop_event.set()
+            break
+        if sprocess.is_alive():            
+            time.sleep(1)
+        else:
+            stop_args.update({'message': 'Source process killed'})
+            stop_event.set()
+            break
+    utils.get_logger().debug("Joining Source Process")
+    try:
+        sprocess.join(10)
+    except:
+        utils.get_logger().debug("Pb joining Source Process - terminating")
+        sprocess.terminate()
+        
+    for i, cprocess in enumerate(tweet_processes):
+        utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
+        try:
+            cprocess.join(3)
+        except:
+            utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
+            cprocess.terminate()
+
+    
+    utils.get_logger().debug("Close queues")
+    try:
+        queue.close()
+        for lqueue in logger_queues:
+            lqueue.close()
+    except exception as e:
+        utils.get_logger().error("error when closing queues %s", repr(e))
+        #do nothing
+        
+    
+    if options.process_nb > 1:
+        utils.get_logger().debug("Processing leftovers")
+        session = session_maker()
+        try:
+            process_leftovers(session, access_token, utils.get_logger())
+            session.commit()
+        finally:
+            session.rollback()
+            session.close()
+
+    for pengine in process_engines:
+        pengine.dispose()
+
+    return stop_args
 
 
 if __name__ == '__main__':
 
-    stop_args = {}
     (options, args) = get_options()
     
     set_logging(options)
@@ -370,135 +514,21 @@
             sys.exit(message)
     
     metadata.create_all(engine)
-    
-    add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
-    
-    access_token = None
-    if not options.username or not options.password:
-        access_token = utils.get_oauth_token(options.token_filename)
-    
-    session = Session()
-    try:
-        process_leftovers(session, access_token, utils.get_logger())
-        session.commit()
-    finally:
-        session.rollback()
-        session.close()
-    
-    if options.process_nb <= 0:
-        utils.get_logger().debug("Leftovers processed. Exiting.")
-        add_process_event(type="shutdown", args=None, session_maker=Session)
-        sys.exit()
-
-    queue = mQueue()
-    stop_event = Event()
-    
-    #workaround for bug on using urllib2 and multiprocessing
-    req = urllib2.Request('http://localhost')
-    conn = None
+    stop_args = {}
     try:
-        conn = urllib2.urlopen(req)
-    except:
-        pass
-        #donothing
-    finally:
-        if conn is not None:
-            conn.close()
-    
-    process_engines = []
-    logger_queues = []
-    
-    SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
-    process_engines.append(engine_process)
-    lqueue = mQueue(1)
-    logger_queues.append(lqueue)
-    pid = os.getpid()
-    sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
-    
-    tweet_processes = []
-    
-    for i in range(options.process_nb - 1):
-        SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
-        process_engines.append(engine_process)
-        lqueue = mQueue(1)
-        logger_queues.append(lqueue)
-        cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
-        tweet_processes.append(cprocess)
+        add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
+        stop_args = do_run(options, Session)
+    except Exception as e:
+        utils.get_logger().exception("Error in main thread")        
+        outfile = StringIO.StringIO()
+        try:
+            traceback.print_exc(file=outfile)
+            stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
+        finally:
+            outfile.close()
+        raise
+    finally:    
+        add_process_event(type="shutdown", args=stop_args, session_maker=Session)
 
-    def interupt_handler(signum, frame):
-        global stop_args
-        utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
-        stop_args = {'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}
-        stop_event.set()
-        
-    signal.signal(signal.SIGINT , interupt_handler)
-    signal.signal(signal.SIGHUP , interupt_handler)
-    signal.signal(signal.SIGALRM, interupt_handler)
-    signal.signal(signal.SIGTERM, interupt_handler)
-
-    log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
-    log_thread.daemon = True
-
-    log_thread.start()
-
-    sprocess.start()
-    for cprocess in tweet_processes:
-        cprocess.start()
-
-    add_process_event("pid", {"main":os.getpid(), 'source':sprocess.pid, 'consumers':[p.pid for p in tweet_processes]}, Session)
-
-    if options.duration >= 0:
-        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
-    
-
-    while not stop_event.is_set():
-        if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
-            stop_event.set()
-            break
-        if sprocess.is_alive():            
-            time.sleep(1)
-        else:
-            stop_event.set()
-            break
-    utils.get_logger().debug("Joining Source Process")
-    try:
-        sprocess.join(10)
-    except:
-        utils.get_logger().debug("Pb joining Source Process - terminating")
-        sprocess.terminate()
-        
-    for i, cprocess in enumerate(tweet_processes):
-        utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
-        try:
-            cprocess.join(3)
-        except:
-            utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
-            cprocess.terminate()
-
-    
-    utils.get_logger().debug("Close queues")
-    try:
-        queue.close()
-        for lqueue in logger_queues:
-            lqueue.close()
-    except exception as e:
-        utils.get_logger().error("error when closing queues %s", repr(e))
-        #do nothing
-        
-    
-    if options.process_nb > 1:
-        utils.get_logger().debug("Processing leftovers")
-        session = Session()
-        try:
-            process_leftovers(session, access_token, utils.get_logger())
-            session.commit()
-        finally:
-            session.rollback()
-            session.close()
-
-    for pengine in process_engines:
-        pengine.dispose()
-        
-    add_process_event(type="shutdown", args=stop_args, session_maker=Session)
     utils.get_logger().debug("Done. Exiting.")