script/stream/recorder_tweetstream.py
changeset 261 d84c4aa2a9eb
parent 260 b97a72ab59a2
child 263 6671e9a4c9c5
--- a/script/stream/recorder_tweetstream.py	Wed Aug 24 18:04:26 2011 +0200
+++ b/script/stream/recorder_tweetstream.py	Thu Aug 25 02:20:08 2011 +0200
@@ -1,6 +1,6 @@
 from getpass import getpass
 from iri_tweet import models, utils
-from iri_tweet.models import TweetSource, TweetLog
+from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
 from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
     get_logger)
 from optparse import OptionParser
@@ -10,6 +10,7 @@
 import StringIO
 import anyjson
 import datetime
+import inspect
 import logging
 import os
 import re
@@ -111,11 +112,30 @@
         # when we get one.
 
 
+class BaseProcess(Process):
+
+    def __init__(self, parent_pid):
+        self.parent_pid = parent_pid
+        super(BaseProcess, self).__init__()
+
+    #
+    # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
+    #
+    def parent_is_alive(self):
+        try:
+            # try to call Parent
+            os.kill(self.parent_pid, 0)
+        except OSError:
+            # *beeep* oh no! The phone's disconnected!
+            return False
+        else:
+            # *ring* Hi mom!
+            return True
 
 
-class SourceProcess(Process):
+class SourceProcess(BaseProcess):
     
-    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
+    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
         self.session_maker = session_maker
         self.queue = queue
         self.track = options.track
@@ -125,8 +145,8 @@
         self.options = options
         self.access_token = access_token
         self.logger_queue = logger_queue
-        super(SourceProcess, self).__init__()
-    
+        super(SourceProcess, self).__init__(parent_pid)
+
     def run(self):
         
         #import pydevd
@@ -148,9 +168,11 @@
         
         try:
             for tweet in stream:
-                self.logger.debug("tweet " + repr(tweet))
+                if not self.parent_is_alive():
+                    sys.exit()
+                self.logger.debug("SourceProcess : tweet " + repr(tweet))
                 source = TweetSource(original_json=tweet)
-                self.logger.debug("source created")
+                self.logger.debug("SourceProcess : source created")
                 add_retries = 0
                 while add_retries < 10:
                     try:
@@ -160,18 +182,18 @@
                         break
                     except OperationalError as e:
                         session.rollback()
-                        self.logger.debug("Operational Error %s nb %d" % (repr(e), add_retries))
+                        self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
                         if add_retries == 10:
                             raise e
                      
                 source_id = source.id
-                self.logger.debug("before queue + source id " + repr(source_id))
-                self.logger.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+                self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
+                self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
                 session.commit()
                 self.queue.put((source_id, tweet), False)
 
         except Exception as e:
-            self.logger.error("Error when processing tweet " + repr(e))
+            self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
         finally:
             session.rollback()
             stream.close()
@@ -208,16 +230,16 @@
 
     
         
-class TweetProcess(Process):
+class TweetProcess(BaseProcess):
     
-    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
+    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
         self.session_maker = session_maker
         self.queue = queue
         self.stop_event = stop_event
         self.options = options
         self.access_token = access_token
         self.logger_queue = logger_queue
-        super(TweetProcess, self).__init__()
+        super(TweetProcess, self).__init__(parent_pid)
 
 
     def run(self):
@@ -225,7 +247,7 @@
         self.logger = set_logging_process(self.options, self.logger_queue)
         session = self.session_maker()
         try:
-            while not self.stop_event.is_set():
+            while not self.stop_event.is_set() and self.parent_is_alive():
                 try:
                     source_id, tweet_txt = queue.get(True, 3)
                     self.logger.debug("Processing source id " + repr(source_id))
@@ -273,7 +295,11 @@
 
         
 def get_options():
-    parser = OptionParser()
+
+    usage = "usage: %prog [options]"
+
+    parser = OptionParser(usage=usage)
+
     parser.add_option("-f", "--file", dest="conn_str",
                       help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
     parser.add_option("-u", "--user", dest="username",
@@ -293,30 +319,35 @@
     parser.add_option("-N", "--nb-process", dest="process_nb",
                       help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
 
-
-
     utils.set_logging_options(parser)
 
     return parser.parse_args()
     
+def add_process_event(type, args, session_maker):
+    session = session_maker()
+    try:
+        evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
+        session.add(evt)
+        session.commit()
+    finally:
+        session.close()
+
 
 if __name__ == '__main__':
 
+    stop_args = {}
     (options, args) = get_options()
     
     set_logging(options)
-        
-    if options.debug:
-        print "OPTIONS : "
-        print repr(options)
     
+    utils.get_logger().debug("OPTIONS : " + repr(options))    
     
     conn_str = options.conn_str.strip()
     if not re.match("^\w+://.+", conn_str):
         conn_str = 'sqlite:///' + options.conn_str
         
     if conn_str.startswith("sqlite") and options.new:
-        filepath = conn_str[conn_str.find(":///")+4:]
+        filepath = conn_str[conn_str.find(":///") + 4:]
         if os.path.exists(filepath):
             i = 1
             basename, extension = os.path.splitext(filepath)
@@ -340,6 +371,8 @@
     
     metadata.create_all(engine)
     
+    add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
+    
     access_token = None
     if not options.username or not options.password:
         access_token = utils.get_oauth_token(options.token_filename)
@@ -354,6 +387,7 @@
     
     if options.process_nb <= 0:
         utils.get_logger().debug("Leftovers processed. Exiting.")
+        add_process_event(type="shutdown", args=None, session_maker=Session)
         sys.exit()
 
     queue = mQueue()
@@ -378,7 +412,8 @@
     process_engines.append(engine_process)
     lqueue = mQueue(1)
     logger_queues.append(lqueue)
-    sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)    
+    pid = os.getpid()
+    sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
     
     tweet_processes = []
     
@@ -387,11 +422,13 @@
         process_engines.append(engine_process)
         lqueue = mQueue(1)
         logger_queues.append(lqueue)
-        cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)
+        cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
         tweet_processes.append(cprocess)
 
     def interupt_handler(signum, frame):
-        utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(frame))
+        global stop_args
+        utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
+        stop_args = {'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}
         stop_event.set()
         
     signal.signal(signal.SIGINT , interupt_handler)
@@ -399,14 +436,16 @@
     signal.signal(signal.SIGALRM, interupt_handler)
     signal.signal(signal.SIGTERM, interupt_handler)
 
-    log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues,stop_event,))
+    log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
     log_thread.daemon = True
 
+    log_thread.start()
+
     sprocess.start()
     for cprocess in tweet_processes:
         cprocess.start()
 
-    log_thread.start()
+    add_process_event("pid", {"main":os.getpid(), 'source':sprocess.pid, 'consumers':[p.pid for p in tweet_processes]}, Session)
 
     if options.duration >= 0:
         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
@@ -460,5 +499,6 @@
     for pengine in process_engines:
         pengine.dispose()
         
+    add_process_event(type="shutdown", args=stop_args, session_maker=Session)
     utils.get_logger().debug("Done. Exiting.")