merge
authorSamuel Huron <samuel.huron@centrepompidou.fr>
Sun, 28 Aug 2011 15:42:28 +0200
changeset 264 c7fd6a0b5b51
parent 259 bc17d1af15ab (current diff)
parent 263 6671e9a4c9c5 (diff)
child 265 3e1c1342ca61
merge
--- a/script/lib/iri_tweet/models.py	Sun Aug 28 11:48:30 2011 +0200
+++ b/script/lib/iri_tweet/models.py	Sun Aug 28 15:42:28 2011 +0200
@@ -1,5 +1,5 @@
-from sqlalchemy import (Boolean, Column, BigInteger, Integer, String, ForeignKey,
-    DateTime, create_engine)
+from sqlalchemy import (Boolean, Column, Enum, BigInteger, Integer, String, 
+    ForeignKey, DateTime, create_engine)
 from sqlalchemy.ext.declarative import declarative_base
 from sqlalchemy.orm import relationship
 import anyjson
@@ -27,12 +27,34 @@
     else:
         return anyjson.serialize(obj)
 
+class TweetMeta(type(Base)):
+            
+    def __init__(cls, name, bases, ns): #@NoSelf
+        def init(self, **kwargs):
+            for key, value in kwargs.items():
+                if hasattr(self, key):
+                    setattr(self, key, value)
+            super(cls, self).__init__()
+        setattr(cls, '__init__', init)
+        super(TweetMeta, cls).__init__(name, bases, ns)
+    
+
+class ProcessEvent(Base):
+    __metaclass__ = TweetMeta
+    __tablename__ = "tweet_process_event"
+    id = Column(Integer, primary_key=True, autoincrement=True)
+    ts = Column(DateTime, default=datetime.datetime.utcnow, index=True)
+    type = Column(Enum("start","pid","shutdown","error", "start_worker", "stop_worker", name="process_event_type_enum"), nullable=False)
+    args = Column(String)
+    
 class EntityType(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = "tweet_entity_type"
     id = Column(Integer, primary_key=True, autoincrement=True)
     label = Column(String)
 
 class Entity(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = "tweet_entity"
     id = Column(Integer, primary_key=True)
     tweet_id = Column(BigInteger, ForeignKey('tweet_tweet.id'))
@@ -44,33 +66,27 @@
     source = Column(String)
     __mapper_args__ = {'polymorphic_on': type, 'polymorphic_identity': 'entity_entity', 'with_polymorphic':'*'}
 
-    def __init__(self, **kwargs):
-        for key, value in kwargs.items():
-            if hasattr(self, key):
-                setattr(self, key, value)
 
 class TweetSource(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = 'tweet_tweet_source'
     id = Column(Integer, primary_key=True, autoincrement=True)
     original_json = Column(String)
-    received_at = Column(DateTime, default=datetime.datetime.now(), index=True)
-    
-    def __init__(self, **kwargs):
-        for key, value in kwargs.items():
-            if hasattr(self, key):
-                setattr(self, key, value)
+    received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True)
 
 
 class TweetLog(Base):
-    
+        
     TWEET_STATUS = {
         'OK' : 1,
         'ERROR' : 2,
         'NOT_TWEET': 3,
     }
+    __metaclass__ = TweetMeta
     
     __tablename__ = 'tweet_tweet_log'
     id = Column(Integer, primary_key=True, autoincrement=True)
+    ts = Column(DateTime, default=datetime.datetime.utcnow, index=True)
     tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
     tweet_source = relationship("TweetSource", backref="logs")
     status = Column(Integer)
@@ -79,6 +95,7 @@
  
     
 class Tweet(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = 'tweet_tweet'
 
     id = Column(BigInteger, primary_key=True, autoincrement=False)
@@ -104,32 +121,31 @@
     tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
     tweet_source = relationship("TweetSource", backref="tweet")
     entity_list = relationship(Entity, backref='tweet')
-    received_at = Column(DateTime, default=datetime.datetime.now(), index=True)
+    received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True)
         
-    def __init__(self, **kwargs):
-        for key, value in kwargs.items():
-            if hasattr(self, key):
-                setattr(self, key, value)
 
 class UserMessage(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = "tweet_user_message"
 
     id = Column(Integer, primary_key=True)
     user_id = Column(Integer, ForeignKey('tweet_user.id'))
     user = relationship("User", backref="messages")
-    created_at = Column(DateTime, default=datetime.datetime.now())
+    created_at = Column(DateTime, default=datetime.datetime.utcnow)
     message_id = Column(Integer, ForeignKey('tweet_message.id'))
 
 class Message(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = "tweet_message"
     
     id = Column(Integer, primary_key=True)
-    created_at = Column(DateTime, default=datetime.datetime.now())
+    created_at = Column(DateTime, default=datetime.datetime.utcnow)
     text = Column(String)
     users = relationship(UserMessage, backref='message')
         
 
 class User(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = "tweet_user"
     
     id = Column(BigInteger, primary_key=True, autoincrement=False)
@@ -153,10 +169,12 @@
     profile_background_image_url = Column(String)
     profile_background_tile = Column(Boolean)
     profile_image_url = Column(String)
+    profile_image_url_https = Column(String)
     profile_link_color = Column(String)
     profile_sidebar_border_color = Column(String)
     profile_sidebar_fill_color = Column(String)
     profile_text_color = Column(String)
+    default_profile_image = Column(String)
     profile_use_background_image = Column(Boolean)
     protected = Column(Boolean)
     screen_name = Column(String, index=True)
@@ -166,43 +184,33 @@
     url = Column(String)
     utc_offset = Column(Integer)
     verified = Column(Boolean)
-
-    def __init__(self, **kwargs):
-        for key, value in kwargs.items():
-            if hasattr(self, key):
-                setattr(self, key, value)    
     
 
 class Hashtag(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = "tweet_hashtag"
     id = Column(Integer, primary_key=True)
     text = Column(String, unique=True, index=True)
-    def __init__(self, **kwargs):
-        for key, value in kwargs.items():
-            if hasattr(self, key):
-                setattr(self, key, value)
+
 
 class Url(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = "tweet_url"
     id = Column(Integer, primary_key=True)
     url = Column(String, unique=True)
     expanded_url = Column(String)
-    def __init__(self, **kwargs):
-        for key, value in kwargs.items():
-            if hasattr(self, key):
-                setattr(self, key, value)
+
 
 class MediaType(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = "tweet_media_type"
     id = Column(Integer, primary_key=True, autoincrement=True)
     label = Column(String, unique=True, index=True)
-    def __init__(self, **kwargs):
-        for key, value in kwargs.items():
-            if hasattr(self, key):
-                setattr(self, key, value)
+
     
 
 class Media(Base):
+    __metaclass__ = TweetMeta
     __tablename__ = "tweet_media"
     id = Column(BigInteger, primary_key=True, autoincrement=False)
     id_str = Column(String, unique=True)
@@ -214,10 +222,6 @@
     sizes = Column(String)
     type_id = Column(Integer, ForeignKey("tweet_media_type.id"))
     type = relationship(MediaType, primaryjoin=type_id == MediaType.id)
-    def __init__(self, **kwargs):
-        for key, value in kwargs.items():
-            if hasattr(self, key):
-                setattr(self, key, value)
 
     
 
@@ -227,11 +231,6 @@
     id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
     hashtag_id = Column(Integer, ForeignKey("tweet_hashtag.id"))
     hashtag = relationship(Hashtag, primaryjoin=hashtag_id == Hashtag.id)
-    def __init__(self, **kwargs):
-        super(EntityHashtag, self).__init__(**kwargs)
-        for key, value in kwargs.items():
-            if hasattr(self, key):
-                setattr(self, key, value)
 
     
 class EntityUrl(Entity):
@@ -240,11 +239,6 @@
     id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
     url_id = Column(Integer, ForeignKey("tweet_url.id"))
     url = relationship(Url, primaryjoin=url_id == Url.id)
-    def __init__(self, **kwargs):
-        super(EntityUrl, self).__init__(**kwargs)
-        for key, value in kwargs.items():
-            if hasattr(self, key):
-                setattr(self, key, value)
 
 class EntityUser(Entity):
     __tablename__ = "tweet_entity_user"
@@ -253,11 +247,6 @@
     user_id = Column(BigInteger, ForeignKey('tweet_user.id'))
     user = relationship(User, primaryjoin=(user_id == User.id))
 
-    def __init__(self, **kwargs):
-        super(EntityUser, self).__init__(**kwargs)
-        for key, value in kwargs.items():
-            if hasattr(self, key):
-                setattr(self, key, value)
                 
 class EntityMedia(Entity):
     __tablename__ = "tweet_entity_media"
@@ -266,12 +255,6 @@
     media_id = Column(BigInteger, ForeignKey('tweet_media.id'))
     media = relationship(Media, primaryjoin=(media_id == Media.id))
 
-    def __init__(self, **kwargs):
-        super(EntityMedia, self).__init__(**kwargs)
-        for key, value in kwargs.items():
-            if hasattr(self, key):
-                setattr(self, key, value)
-
                 
 def setup_database(*args, **kwargs):
         
@@ -288,118 +271,3 @@
 
     return (engine, metadata)
 
-rest_tweet_tweet = {
-    u'iso_language_code': 'unicode',
-    u'text': 'unicode',
-    u'from_user_id_str': 'unicode',
-    u'profile_image_url': 'unicode',
-    u'to_user_id_str': 'NoneType',
-    u'created_at': 'unicode',
-    u'source': 'unicode',
-    u'to_user': 'unicode',
-    u'id_str': 'unicode',
-    u'from_user': 'unicode',
-    u'place': {u'type': 'unicode', u'id': 'unicode', u'full_name': 'unicode'},
-    u'from_user_id': 'int',
-    u'to_user_id': 'NoneType',
-    u'geo': 'NoneType',
-    u'id': 'int',
-    u'metadata': {u'result_type': 'unicode'}
-}
-
-tweet_tweet = {
-    'contributors': None,
-    'coordinates': None,
-    'created_at': 'date',
-    'entities': "tweet_entity",
-    'favorited': "bool",
-    'geo': None,
-    'id': "long",
-    'id_str': "string",
-    'in_reply_to_screen_name': "string",
-    'in_reply_to_status_id': "long",
-    'in_reply_to_status_id_str': "string",
-    'in_reply_to_user_id': "int",
-    'in_reply_to_user_id_str': "string",
-    'place': "string",
-    'retweet_count': "int",
-    'retweeted': "bool",
-    'source': "string",
-    'text': "string",
-    'truncated': "bool",
-    'user': "tweet_user"
-}
-tweet_user = {
-    'contributors_enabled': 'bool',
-    'created_at': 'str',
-    'description': 'str',
-    'favourites_count': 'int',
-    'follow_request_sent': None,
-    'followers_count': 'int',
-    'following': None,
-    'friends_count': 'int',
-    'geo_enabled': 'bool',
-    'id': 'int',
-    'id_str': 'str',
-    'is_translator': 'bool',
-    'lang': 'str',
-    'listed_count': 'int',
-    'location': 'str',
-    'name': 'str',
-    'notifications': 'NoneType',
-    'profile_background_color': 'str',
-    'profile_background_image_url': 'str',
-    'profile_background_tile': 'bool',
-    'profile_image_url': 'str',
-    'profile_link_color': 'str',
-    'profile_sidebar_border_color': 'str',
-    'profile_sidebar_fill_color': 'str',
-    'profile_text_color': 'str',
-    'profile_use_background_image': 'bool',
-    'protected': 'bool',
-    'screen_name': 'str',
-    'show_all_inline_media': 'bool',
-    'statuses_count': 'int',
-    'time_zone': 'str',
-    'url': 'str',
-    'utc_offset': 'int',
-    'verified': 'bool',
-}
-
-
-tweet_entity_hashtag = {
-    'hashtag' : 'tweet_hashtag',
-    'indice_start' : 'int',
-    'indice_end' : 'int',
-    'tweet':'tweet_tweet'
-}
-
-tweet_entity_url = {
-    'url' : 'tweet_url',
-    'indice_start' : 'int',
-    'indice_end' : 'int',
-    'tweet':'tweet_tweet'
-}
-
-tweet_entity_user = {
-    'user' : 'tweet_user',
-    'indice_start' : 'int',
-    'indice_end' : 'int',
-    'tweet':'tweet_tweet'
-}
-
-#id int
-#id_str str
-#indices list
-#name str
-#screen_name str
-
-tweet_hashtag = {
-    "text": "string"
-}
-
-tweet_url = {
-    "url": "string",
-    "expanded_url" : "string",
-}
-
--- a/script/stream/recorder_tweetstream.py	Sun Aug 28 11:48:30 2011 +0200
+++ b/script/stream/recorder_tweetstream.py	Sun Aug 28 15:42:28 2011 +0200
@@ -1,6 +1,6 @@
 from getpass import getpass
 from iri_tweet import models, utils
-from iri_tweet.models import TweetSource, TweetLog
+from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
 from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
     get_logger)
 from optparse import OptionParser
@@ -10,6 +10,7 @@
 import StringIO
 import anyjson
 import datetime
+import inspect
 import logging
 import os
 import re
@@ -110,24 +111,68 @@
         # Don't listen to auth error, since we can't reasonably reconnect
         # when we get one.
 
+def add_process_event(type, args, session_maker):
+    session = session_maker()
+    try:
+        evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
+        session.add(evt)
+        session.commit()
+    finally:
+        session.close()
+
+
+class BaseProcess(Process):
+
+    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
+        self.parent_pid = parent_pid
+        self.session_maker = session_maker
+        self.queue = queue
+        self.options = options
+        self.logger_queue = logger_queue
+        self.stop_event = stop_event
+        self.access_token = access_token
+
+        super(BaseProcess, self).__init__()
+
+    #
+    # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
+    #
+    def parent_is_alive(self):
+        try:
+            # try to call Parent
+            os.kill(self.parent_pid, 0)
+        except OSError:
+            # *beeep* oh no! The phone's disconnected!
+            return False
+        else:
+            # *ring* Hi mom!
+            return True
+    
+
+    def __get_process_event_args(self):
+        return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
+
+    def run(self):
+        try:
+            add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
+            self.do_run()
+        finally:
+            add_process_event("stop_worker",self.__get_process_event_args(), self.session_maker)
+        
+    def do_run(self):
+        raise NotImplementedError()
 
 
 
-class SourceProcess(Process):
+class SourceProcess(BaseProcess):
     
-    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
-        self.session_maker = session_maker
-        self.queue = queue
+    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
         self.track = options.track
         self.reconnects = options.reconnects
         self.token_filename = options.token_filename
-        self.stop_event = stop_event
-        self.options = options
-        self.access_token = access_token
-        self.logger_queue = logger_queue
-        super(SourceProcess, self).__init__()
-    
-    def run(self):
+        super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
+
+    def do_run(self):
         
         #import pydevd
         #pydevd.settrace(suspend=False)
@@ -148,9 +193,11 @@
         
         try:
             for tweet in stream:
-                self.logger.debug("tweet " + repr(tweet))
+                if not self.parent_is_alive():
+                    sys.exit()
+                self.logger.debug("SourceProcess : tweet " + repr(tweet))
                 source = TweetSource(original_json=tweet)
-                self.logger.debug("source created")
+                self.logger.debug("SourceProcess : source created")
                 add_retries = 0
                 while add_retries < 10:
                     try:
@@ -160,18 +207,18 @@
                         break
                     except OperationalError as e:
                         session.rollback()
-                        self.logger.debug("Operational Error %s nb %d" % (repr(e), add_retries))
+                        self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
                         if add_retries == 10:
                             raise e
                      
                 source_id = source.id
-                self.logger.debug("before queue + source id " + repr(source_id))
-                self.logger.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+                self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
+                self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
                 session.commit()
                 self.queue.put((source_id, tweet), False)
 
         except Exception as e:
-            self.logger.error("Error when processing tweet " + repr(e))
+            self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
         finally:
             session.rollback()
             stream.close()
@@ -196,11 +243,13 @@
         processor.process()
     except Exception as e:
         message = u"Error %s processing tweet %s" % (repr(e), tweet)
-        logger.error(message)
+        logger.exception(message)
         output = StringIO.StringIO()
-        traceback.print_exception(Exception, e, None, None, output)
-        error_stack = output.getvalue()
-        output.close()
+        try:
+            traceback.print_exc(file=output)
+            error_stack = output.getvalue()
+        finally:
+            output.close()
         session.rollback()
         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
         session.add(tweet_log)
@@ -208,26 +257,20 @@
 
     
         
-class TweetProcess(Process):
+class TweetProcess(BaseProcess):
     
-    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
-        self.session_maker = session_maker
-        self.queue = queue
-        self.stop_event = stop_event
-        self.options = options
-        self.access_token = access_token
-        self.logger_queue = logger_queue
-        super(TweetProcess, self).__init__()
+    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
+        super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
 
 
-    def run(self):
+    def do_run(self):
         
         self.logger = set_logging_process(self.options, self.logger_queue)
         session = self.session_maker()
         try:
-            while not self.stop_event.is_set():
+            while not self.stop_event.is_set() and self.parent_is_alive():
                 try:
-                    source_id, tweet_txt = queue.get(True, 3)
+                    source_id, tweet_txt = self.queue.get(True, 3)
                     self.logger.debug("Processing source id " + repr(source_id))
                 except Exception as e:
                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
@@ -273,7 +316,11 @@
 
         
 def get_options():
-    parser = OptionParser()
+
+    usage = "usage: %prog [options]"
+
+    parser = OptionParser(usage=usage)
+
     parser.add_option("-f", "--file", dest="conn_str",
                       help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
     parser.add_option("-u", "--user", dest="username",
@@ -293,58 +340,20 @@
     parser.add_option("-N", "--nb-process", dest="process_nb",
                       help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
 
-
-
     utils.set_logging_options(parser)
 
     return parser.parse_args()
-    
 
-if __name__ == '__main__':
 
-    (options, args) = get_options()
-    
-    set_logging(options)
-        
-    if options.debug:
-        print "OPTIONS : "
-        print repr(options)
-    
-    
-    conn_str = options.conn_str.strip()
-    if not re.match("^\w+://.+", conn_str):
-        conn_str = 'sqlite:///' + options.conn_str
-        
-    if conn_str.startswith("sqlite") and options.new:
-        filepath = conn_str[conn_str.find(":///")+4:]
-        if os.path.exists(filepath):
-            i = 1
-            basename, extension = os.path.splitext(filepath)
-            new_path = '%s.%d%s' % (basename, i, extension)
-            while i < 1000000 and os.path.exists(new_path):
-                i += 1
-                new_path = '%s.%d%s' % (basename, i, extension)
-            if i >= 1000000:
-                raise Exception("Unable to find new filename for " + filepath)
-            else:
-                shutil.move(filepath, new_path)
+def do_run(options, session_maker):
 
-    Session, engine, metadata = get_sessionmaker(conn_str)
-    
-    if options.new:
-        check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
-        if len(check_metadata.sorted_tables) > 0:
-            message = "Database %s not empty exiting" % conn_str
-            utils.get_logger().error(message)
-            sys.exit(message)
-    
-    metadata.create_all(engine)
-    
+    stop_args = {}
+
     access_token = None
     if not options.username or not options.password:
         access_token = utils.get_oauth_token(options.token_filename)
     
-    session = Session()
+    session = session_maker()
     try:
         process_leftovers(session, access_token, utils.get_logger())
         session.commit()
@@ -354,7 +363,7 @@
     
     if options.process_nb <= 0:
         utils.get_logger().debug("Leftovers processed. Exiting.")
-        sys.exit()
+        return None
 
     queue = mQueue()
     stop_event = Event()
@@ -365,7 +374,7 @@
     try:
         conn = urllib2.urlopen(req)
     except:
-        pass
+        utils.get_logger().debug("could not open localhost")
         #donothing
     finally:
         if conn is not None:
@@ -378,7 +387,8 @@
     process_engines.append(engine_process)
     lqueue = mQueue(1)
     logger_queues.append(lqueue)
-    sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)    
+    pid = os.getpid()
+    sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
     
     tweet_processes = []
     
@@ -387,22 +397,29 @@
         process_engines.append(engine_process)
         lqueue = mQueue(1)
         logger_queues.append(lqueue)
-        cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)
+        cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
         tweet_processes.append(cprocess)
 
     def interupt_handler(signum, frame):
+        utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
+        stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
         stop_event.set()
         
-    signal.signal(signal.SIGINT, interupt_handler)
+    signal.signal(signal.SIGINT , interupt_handler)
+    signal.signal(signal.SIGHUP , interupt_handler)
+    signal.signal(signal.SIGALRM, interupt_handler)
+    signal.signal(signal.SIGTERM, interupt_handler)
 
-    log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues,stop_event,))
+    log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
     log_thread.daemon = True
 
+    log_thread.start()
+
     sprocess.start()
     for cprocess in tweet_processes:
         cprocess.start()
 
-    log_thread.start()
+    add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name,sprocess.pid), 'consumers':dict([(p.name,p.pid) for p in tweet_processes])}, session_maker)
 
     if options.duration >= 0:
         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
@@ -410,11 +427,13 @@
 
     while not stop_event.is_set():
         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
+            stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
             stop_event.set()
             break
         if sprocess.is_alive():            
             time.sleep(1)
         else:
+            stop_args.update({'message': 'Source process killed'})
             stop_event.set()
             break
     utils.get_logger().debug("Joining Source Process")
@@ -445,7 +464,7 @@
     
     if options.process_nb > 1:
         utils.get_logger().debug("Processing leftovers")
-        session = Session()
+        session = session_maker()
         try:
             process_leftovers(session, access_token, utils.get_logger())
             session.commit()
@@ -455,6 +474,61 @@
 
     for pengine in process_engines:
         pengine.dispose()
+
+    return stop_args
+
+
+if __name__ == '__main__':
+
+    (options, args) = get_options()
+    
+    set_logging(options)
+    
+    utils.get_logger().debug("OPTIONS : " + repr(options))    
+    
+    conn_str = options.conn_str.strip()
+    if not re.match("^\w+://.+", conn_str):
+        conn_str = 'sqlite:///' + options.conn_str
         
+    if conn_str.startswith("sqlite") and options.new:
+        filepath = conn_str[conn_str.find(":///") + 4:]
+        if os.path.exists(filepath):
+            i = 1
+            basename, extension = os.path.splitext(filepath)
+            new_path = '%s.%d%s' % (basename, i, extension)
+            while i < 1000000 and os.path.exists(new_path):
+                i += 1
+                new_path = '%s.%d%s' % (basename, i, extension)
+            if i >= 1000000:
+                raise Exception("Unable to find new filename for " + filepath)
+            else:
+                shutil.move(filepath, new_path)
+
+    Session, engine, metadata = get_sessionmaker(conn_str)
+    
+    if options.new:
+        check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
+        if len(check_metadata.sorted_tables) > 0:
+            message = "Database %s not empty exiting" % conn_str
+            utils.get_logger().error(message)
+            sys.exit(message)
+    
+    metadata.create_all(engine)
+    stop_args = {}
+    try:
+        add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
+        stop_args = do_run(options, Session)
+    except Exception as e:
+        utils.get_logger().exception("Error in main thread")        
+        outfile = StringIO.StringIO()
+        try:
+            traceback.print_exc(file=outfile)
+            stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
+        finally:
+            outfile.close()
+        raise
+    finally:    
+        add_process_event(type="shutdown", args=stop_args, session_maker=Session)
+
     utils.get_logger().debug("Done. Exiting.")