correct model ans improve event tracking
authorYves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Thu, 25 Aug 2011 18:23:53 +0200
changeset 263 6671e9a4c9c5
parent 262 33cf0231e253
child 264 c7fd6a0b5b51
correct model ans improve event tracking
script/lib/iri_tweet/models.py
script/stream/recorder_tweetstream.py
--- a/script/lib/iri_tweet/models.py	Thu Aug 25 02:22:19 2011 +0200
+++ b/script/lib/iri_tweet/models.py	Thu Aug 25 18:23:53 2011 +0200
@@ -27,27 +27,34 @@
     else:
         return anyjson.serialize(obj)
 
-class TweetBase(object):
-    
-    def __init__(self, **kwargs):
-        for key, value in kwargs.items():
-            if hasattr(self, key):
-                setattr(self, key, value)
+class TweetMeta(type(Base)):
+            
+    def __init__(cls, name, bases, ns): #@NoSelf
+        def init(self, **kwargs):
+            for key, value in kwargs.items():
+                if hasattr(self, key):
+                    setattr(self, key, value)
+            super(cls, self).__init__()
+        setattr(cls, '__init__', init)
+        super(TweetMeta, cls).__init__(name, bases, ns)
     
 
-class ProcessEvent(Base, TweetBase):
+class ProcessEvent(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = "tweet_process_event"
     id = Column(Integer, primary_key=True, autoincrement=True)
-    ts = Column(DateTime, default=datetime.datetime.utcnow(), index=True)
-    type = Column(Enum("start","pid","shutdown","error", name="process_event_type_enum"), nullable=False)
+    ts = Column(DateTime, default=datetime.datetime.utcnow, index=True)
+    type = Column(Enum("start","pid","shutdown","error", "start_worker", "stop_worker", name="process_event_type_enum"), nullable=False)
     args = Column(String)
     
-class EntityType(Base, TweetBase):
+class EntityType(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = "tweet_entity_type"
     id = Column(Integer, primary_key=True, autoincrement=True)
     label = Column(String)
 
-class Entity(Base, TweetBase):
+class Entity(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = "tweet_entity"
     id = Column(Integer, primary_key=True)
     tweet_id = Column(BigInteger, ForeignKey('tweet_tweet.id'))
@@ -60,24 +67,26 @@
     __mapper_args__ = {'polymorphic_on': type, 'polymorphic_identity': 'entity_entity', 'with_polymorphic':'*'}
 
 
-class TweetSource(Base, TweetBase):
+class TweetSource(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = 'tweet_tweet_source'
     id = Column(Integer, primary_key=True, autoincrement=True)
     original_json = Column(String)
-    received_at = Column(DateTime, default=datetime.datetime.utcnow(), index=True)
+    received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True)
 
 
-class TweetLog(Base, TweetBase):
-    
+class TweetLog(Base):
+        
     TWEET_STATUS = {
         'OK' : 1,
         'ERROR' : 2,
         'NOT_TWEET': 3,
     }
+    __metaclass__ = TweetMeta
     
     __tablename__ = 'tweet_tweet_log'
     id = Column(Integer, primary_key=True, autoincrement=True)
-    ts = Column(DateTime, default=datetime.datetime.utcnow(), index=True)
+    ts = Column(DateTime, default=datetime.datetime.utcnow, index=True)
     tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
     tweet_source = relationship("TweetSource", backref="logs")
     status = Column(Integer)
@@ -85,7 +94,8 @@
     error_stack = Column(String)
  
     
-class Tweet(Base, TweetBase):
+class Tweet(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = 'tweet_tweet'
 
     id = Column(BigInteger, primary_key=True, autoincrement=False)
@@ -111,28 +121,31 @@
     tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
     tweet_source = relationship("TweetSource", backref="tweet")
     entity_list = relationship(Entity, backref='tweet')
-    received_at = Column(DateTime, default=datetime.datetime.utcnow(), index=True)
+    received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True)
         
 
-class UserMessage(Base, TweetBase):
+class UserMessage(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = "tweet_user_message"
 
     id = Column(Integer, primary_key=True)
     user_id = Column(Integer, ForeignKey('tweet_user.id'))
     user = relationship("User", backref="messages")
-    created_at = Column(DateTime, default=datetime.datetime.utcnow())
+    created_at = Column(DateTime, default=datetime.datetime.utcnow)
     message_id = Column(Integer, ForeignKey('tweet_message.id'))
 
-class Message(Base, TweetBase):
+class Message(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = "tweet_message"
     
     id = Column(Integer, primary_key=True)
-    created_at = Column(DateTime, default=datetime.datetime.utcnow())
+    created_at = Column(DateTime, default=datetime.datetime.utcnow)
     text = Column(String)
     users = relationship(UserMessage, backref='message')
         
 
-class User(Base, TweetBase):
+class User(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = "tweet_user"
     
     id = Column(BigInteger, primary_key=True, autoincrement=False)
@@ -156,10 +169,12 @@
     profile_background_image_url = Column(String)
     profile_background_tile = Column(Boolean)
     profile_image_url = Column(String)
+    profile_image_url_https = Column(String)
     profile_link_color = Column(String)
     profile_sidebar_border_color = Column(String)
     profile_sidebar_fill_color = Column(String)
     profile_text_color = Column(String)
+    default_profile_image = Column(String)
     profile_use_background_image = Column(Boolean)
     protected = Column(Boolean)
     screen_name = Column(String, index=True)
@@ -171,27 +186,31 @@
     verified = Column(Boolean)
     
 
-class Hashtag(Base, TweetBase):
+class Hashtag(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = "tweet_hashtag"
     id = Column(Integer, primary_key=True)
     text = Column(String, unique=True, index=True)
 
 
-class Url(Base, TweetBase):
+class Url(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = "tweet_url"
     id = Column(Integer, primary_key=True)
     url = Column(String, unique=True)
     expanded_url = Column(String)
 
 
-class MediaType(Base, TweetBase):
+class MediaType(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = "tweet_media_type"
     id = Column(Integer, primary_key=True, autoincrement=True)
     label = Column(String, unique=True, index=True)
 
     
 
-class Media(Base, TweetBase):
+class Media(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = "tweet_media"
     id = Column(BigInteger, primary_key=True, autoincrement=False)
     id_str = Column(String, unique=True)
--- a/script/stream/recorder_tweetstream.py	Thu Aug 25 02:22:19 2011 +0200
+++ b/script/stream/recorder_tweetstream.py	Thu Aug 25 18:23:53 2011 +0200
@@ -111,11 +111,27 @@
         # Don't listen to auth error, since we can't reasonably reconnect
         # when we get one.
 
+def add_process_event(type, args, session_maker):
+    session = session_maker()
+    try:
+        evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
+        session.add(evt)
+        session.commit()
+    finally:
+        session.close()
+
 
 class BaseProcess(Process):
 
-    def __init__(self, parent_pid):
+    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
         self.parent_pid = parent_pid
+        self.session_maker = session_maker
+        self.queue = queue
+        self.options = options
+        self.logger_queue = logger_queue
+        self.stop_event = stop_event
+        self.access_token = access_token
+
         super(BaseProcess, self).__init__()
 
     #
@@ -131,23 +147,32 @@
         else:
             # *ring* Hi mom!
             return True
+    
+
+    def __get_process_event_args(self):
+        return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
+
+    def run(self):
+        try:
+            add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
+            self.do_run()
+        finally:
+            add_process_event("stop_worker",self.__get_process_event_args(), self.session_maker)
+        
+    def do_run(self):
+        raise NotImplementedError()
+
 
 
 class SourceProcess(BaseProcess):
     
     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
-        self.session_maker = session_maker
-        self.queue = queue
         self.track = options.track
         self.reconnects = options.reconnects
         self.token_filename = options.token_filename
-        self.stop_event = stop_event
-        self.options = options
-        self.access_token = access_token
-        self.logger_queue = logger_queue
-        super(SourceProcess, self).__init__(parent_pid)
+        super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
 
-    def run(self):
+    def do_run(self):
         
         #import pydevd
         #pydevd.settrace(suspend=False)
@@ -218,11 +243,13 @@
         processor.process()
     except Exception as e:
         message = u"Error %s processing tweet %s" % (repr(e), tweet)
-        logger.error(message)
+        logger.exception(message)
         output = StringIO.StringIO()
-        traceback.print_exception(Exception, e, None, None, output)
-        error_stack = output.getvalue()
-        output.close()
+        try:
+            traceback.print_exc(file=output)
+            error_stack = output.getvalue()
+        finally:
+            output.close()
         session.rollback()
         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
         session.add(tweet_log)
@@ -233,23 +260,17 @@
 class TweetProcess(BaseProcess):
     
     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
-        self.session_maker = session_maker
-        self.queue = queue
-        self.stop_event = stop_event
-        self.options = options
-        self.access_token = access_token
-        self.logger_queue = logger_queue
-        super(TweetProcess, self).__init__(parent_pid)
+        super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
 
 
-    def run(self):
+    def do_run(self):
         
         self.logger = set_logging_process(self.options, self.logger_queue)
         session = self.session_maker()
         try:
             while not self.stop_event.is_set() and self.parent_is_alive():
                 try:
-                    source_id, tweet_txt = queue.get(True, 3)
+                    source_id, tweet_txt = self.queue.get(True, 3)
                     self.logger.debug("Processing source id " + repr(source_id))
                 except Exception as e:
                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
@@ -322,20 +343,143 @@
     utils.set_logging_options(parser)
 
     return parser.parse_args()
+
+
+def do_run(options, session_maker):
+
+    stop_args = {}
+
+    access_token = None
+    if not options.username or not options.password:
+        access_token = utils.get_oauth_token(options.token_filename)
     
-def add_process_event(type, args, session_maker):
     session = session_maker()
     try:
-        evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
-        session.add(evt)
+        process_leftovers(session, access_token, utils.get_logger())
         session.commit()
     finally:
+        session.rollback()
         session.close()
+    
+    if options.process_nb <= 0:
+        utils.get_logger().debug("Leftovers processed. Exiting.")
+        return None
+
+    queue = mQueue()
+    stop_event = Event()
+    
+    #workaround for bug on using urllib2 and multiprocessing
+    req = urllib2.Request('http://localhost')
+    conn = None
+    try:
+        conn = urllib2.urlopen(req)
+    except:
+        utils.get_logger().debug("could not open localhost")
+        #donothing
+    finally:
+        if conn is not None:
+            conn.close()
+    
+    process_engines = []
+    logger_queues = []
+    
+    SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+    process_engines.append(engine_process)
+    lqueue = mQueue(1)
+    logger_queues.append(lqueue)
+    pid = os.getpid()
+    sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
+    
+    tweet_processes = []
+    
+    for i in range(options.process_nb - 1):
+        SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+        process_engines.append(engine_process)
+        lqueue = mQueue(1)
+        logger_queues.append(lqueue)
+        cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
+        tweet_processes.append(cprocess)
+
+    def interupt_handler(signum, frame):
+        utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
+        stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
+        stop_event.set()
+        
+    signal.signal(signal.SIGINT , interupt_handler)
+    signal.signal(signal.SIGHUP , interupt_handler)
+    signal.signal(signal.SIGALRM, interupt_handler)
+    signal.signal(signal.SIGTERM, interupt_handler)
+
+    log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
+    log_thread.daemon = True
+
+    log_thread.start()
+
+    sprocess.start()
+    for cprocess in tweet_processes:
+        cprocess.start()
+
+    add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name,sprocess.pid), 'consumers':dict([(p.name,p.pid) for p in tweet_processes])}, session_maker)
+
+    if options.duration >= 0:
+        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
+    
+
+    while not stop_event.is_set():
+        if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
+            stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
+            stop_event.set()
+            break
+        if sprocess.is_alive():            
+            time.sleep(1)
+        else:
+            stop_args.update({'message': 'Source process killed'})
+            stop_event.set()
+            break
+    utils.get_logger().debug("Joining Source Process")
+    try:
+        sprocess.join(10)
+    except:
+        utils.get_logger().debug("Pb joining Source Process - terminating")
+        sprocess.terminate()
+        
+    for i, cprocess in enumerate(tweet_processes):
+        utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
+        try:
+            cprocess.join(3)
+        except:
+            utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
+            cprocess.terminate()
+
+    
+    utils.get_logger().debug("Close queues")
+    try:
+        queue.close()
+        for lqueue in logger_queues:
+            lqueue.close()
+    except exception as e:
+        utils.get_logger().error("error when closing queues %s", repr(e))
+        #do nothing
+        
+    
+    if options.process_nb > 1:
+        utils.get_logger().debug("Processing leftovers")
+        session = session_maker()
+        try:
+            process_leftovers(session, access_token, utils.get_logger())
+            session.commit()
+        finally:
+            session.rollback()
+            session.close()
+
+    for pengine in process_engines:
+        pengine.dispose()
+
+    return stop_args
 
 
 if __name__ == '__main__':
 
-    stop_args = {}
     (options, args) = get_options()
     
     set_logging(options)
@@ -370,135 +514,21 @@
             sys.exit(message)
     
     metadata.create_all(engine)
-    
-    add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
-    
-    access_token = None
-    if not options.username or not options.password:
-        access_token = utils.get_oauth_token(options.token_filename)
-    
-    session = Session()
-    try:
-        process_leftovers(session, access_token, utils.get_logger())
-        session.commit()
-    finally:
-        session.rollback()
-        session.close()
-    
-    if options.process_nb <= 0:
-        utils.get_logger().debug("Leftovers processed. Exiting.")
-        add_process_event(type="shutdown", args=None, session_maker=Session)
-        sys.exit()
-
-    queue = mQueue()
-    stop_event = Event()
-    
-    #workaround for bug on using urllib2 and multiprocessing
-    req = urllib2.Request('http://localhost')
-    conn = None
+    stop_args = {}
     try:
-        conn = urllib2.urlopen(req)
-    except:
-        pass
-        #donothing
-    finally:
-        if conn is not None:
-            conn.close()
-    
-    process_engines = []
-    logger_queues = []
-    
-    SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
-    process_engines.append(engine_process)
-    lqueue = mQueue(1)
-    logger_queues.append(lqueue)
-    pid = os.getpid()
-    sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
-    
-    tweet_processes = []
-    
-    for i in range(options.process_nb - 1):
-        SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
-        process_engines.append(engine_process)
-        lqueue = mQueue(1)
-        logger_queues.append(lqueue)
-        cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
-        tweet_processes.append(cprocess)
+        add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
+        stop_args = do_run(options, Session)
+    except Exception as e:
+        utils.get_logger().exception("Error in main thread")        
+        outfile = StringIO.StringIO()
+        try:
+            traceback.print_exc(file=outfile)
+            stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
+        finally:
+            outfile.close()
+        raise
+    finally:    
+        add_process_event(type="shutdown", args=stop_args, session_maker=Session)
 
-    def interupt_handler(signum, frame):
-        global stop_args
-        utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
-        stop_args = {'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}
-        stop_event.set()
-        
-    signal.signal(signal.SIGINT , interupt_handler)
-    signal.signal(signal.SIGHUP , interupt_handler)
-    signal.signal(signal.SIGALRM, interupt_handler)
-    signal.signal(signal.SIGTERM, interupt_handler)
-
-    log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
-    log_thread.daemon = True
-
-    log_thread.start()
-
-    sprocess.start()
-    for cprocess in tweet_processes:
-        cprocess.start()
-
-    add_process_event("pid", {"main":os.getpid(), 'source':sprocess.pid, 'consumers':[p.pid for p in tweet_processes]}, Session)
-
-    if options.duration >= 0:
-        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
-    
-
-    while not stop_event.is_set():
-        if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
-            stop_event.set()
-            break
-        if sprocess.is_alive():            
-            time.sleep(1)
-        else:
-            stop_event.set()
-            break
-    utils.get_logger().debug("Joining Source Process")
-    try:
-        sprocess.join(10)
-    except:
-        utils.get_logger().debug("Pb joining Source Process - terminating")
-        sprocess.terminate()
-        
-    for i, cprocess in enumerate(tweet_processes):
-        utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
-        try:
-            cprocess.join(3)
-        except:
-            utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
-            cprocess.terminate()
-
-    
-    utils.get_logger().debug("Close queues")
-    try:
-        queue.close()
-        for lqueue in logger_queues:
-            lqueue.close()
-    except exception as e:
-        utils.get_logger().error("error when closing queues %s", repr(e))
-        #do nothing
-        
-    
-    if options.process_nb > 1:
-        utils.get_logger().debug("Processing leftovers")
-        session = Session()
-        try:
-            process_leftovers(session, access_token, utils.get_logger())
-            session.commit()
-        finally:
-            session.rollback()
-            session.close()
-
-    for pengine in process_engines:
-        pengine.dispose()
-        
-    add_process_event(type="shutdown", args=stop_args, session_maker=Session)
     utils.get_logger().debug("Done. Exiting.")