Merge with 9f24acbe66fb87308778a41a18d19bb918bdb50f
authorYves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Tue, 09 Aug 2011 13:07:23 +0200
changeset 253 e9335ee3cf71
parent 252 2ebf22c65168 (current diff)
parent 245 4c953ca2aa1d (diff)
child 254 2209e66bb50b
Merge with 9f24acbe66fb87308778a41a18d19bb918bdb50f
script/lib/iri_tweet/models.py
script/lib/iri_tweet/utils.py
--- a/.project	Wed Jul 27 18:32:56 2011 +0200
+++ b/.project	Tue Aug 09 13:07:23 2011 +0200
@@ -25,4 +25,15 @@
 		<nature>org.python.pydev.pythonNature</nature>
 		<nature>org.eclipse.wst.jsdt.core.jsNature</nature>
 	</natures>
+	<filteredResources>
+		<filter>
+			<id>1312812919641</id>
+			<name></name>
+			<type>6</type>
+			<matcher>
+				<id>org.eclipse.ui.ide.multiFilter</id>
+				<arguments>1.0-name-matches-false-false-.DS_Store</arguments>
+			</matcher>
+		</filter>
+	</filteredResources>
 </projectDescription>
--- a/script/lib/iri_tweet/export_tweet_db.py	Wed Jul 27 18:32:56 2011 +0200
+++ b/script/lib/iri_tweet/export_tweet_db.py	Tue Aug 09 13:07:23 2011 +0200
@@ -1,8 +1,7 @@
 from models import setup_database
 from optparse import OptionParser #@UnresolvedImport
 from sqlalchemy.orm import sessionmaker
-from utils import set_logging_options, set_logging, TwitterProcessor
-import logging
+from utils import set_logging_options, set_logging, TwitterProcessor, logger
 import sqlite3 #@UnresolvedImport
 
 
@@ -34,11 +33,11 @@
             curs_in = conn_in.cursor()
             fields_mapping = {}
             for i,res in enumerate(curs_in.execute("select json from tweet_tweet;")):
-                logging.debug("main loop %d : %s" % (i, res[0])) #@UndefinedVariable
-                processor = TwitterProcessor(eval(res[0]), res[0], session, options.token_filename)
+                logger.debug("main loop %d : %s" % (i, res[0])) #@UndefinedVariable
+                processor = TwitterProcessor(eval(res[0]), res[0], None, session, options.token_filename)
                 processor.process()
                 session.commit()
-            logging.debug("main : %d tweet processed" % (i+1)) #@UndefinedVariable
+            logger.debug("main : %d tweet processed" % (i+1)) #@UndefinedVariable
         except Exception, e:
             session.rollback()
             raise e
--- a/script/lib/iri_tweet/export_twitter_alchemy.py	Wed Jul 27 18:32:56 2011 +0200
+++ b/script/lib/iri_tweet/export_twitter_alchemy.py	Tue Aug 09 13:07:23 2011 +0200
@@ -5,10 +5,9 @@
 from optparse import OptionParser #@UnresolvedImport
 from sqlalchemy import Table, Column, BigInteger, MetaData
 from sqlalchemy.orm import sessionmaker
-from utils import parse_date, set_logging_options, set_logging, get_filter_query
+from utils import parse_date, set_logging_options, set_logging, get_filter_query, logger
 from models import setup_database
 import datetime
-import logging
 import os.path
 import re
 import sys
@@ -101,7 +100,7 @@
     
     set_logging(options)
         
-    logging.debug("OPTIONS : " + repr(options)) #@UndefinedVariable
+    logger.debug("OPTIONS : " + repr(options)) #@UndefinedVariable
     
     if len(sys.argv) == 1 or options.database is None:
         parser.print_help()
@@ -159,7 +158,7 @@
             
             for params in parameters:
                 
-                logging.debug("PARAMETERS " + repr(params)) #@UndefinedVariable
+                logger.debug("PARAMETERS " + repr(params)) #@UndefinedVariable
                 
                 start_date_str = params.get("start_date",None)
                 end_date_str = params.get("end_date", None)
@@ -192,12 +191,12 @@
                 
                 if content_file and content_file.find("http") == 0:
                     
-                    logging.debug("url : " + content_file) #@UndefinedVariable
+                    logger.debug("url : " + content_file) #@UndefinedVariable
                     
                     h = httplib2.Http()
                     resp, content = h.request(content_file)
                     
-                    logging.debug("url response " + repr(resp) + " content " + repr(content)) #@UndefinedVariable
+                    logger.debug("url response " + repr(resp) + " content " + repr(content)) #@UndefinedVariable
                     
                     project = anyjson.deserialize(content)
                     root = etree.fromstring(project["ldt"])
@@ -254,7 +253,7 @@
                     
                     
                 if ensemble_parent is None:
-                    logging.error("Can not process file") #@UndefinedVariable
+                    logger.error("Can not process file") #@UndefinedVariable
                     sys.exit()
             
                 if options.replace:
@@ -309,18 +308,18 @@
                     
                     project["ldt"] = output_data
                     body = anyjson.serialize(project)
-                    logging.debug("write http " + content_file) #@UndefinedVariable
-                    logging.debug("write http " + repr(body)) #@UndefinedVariable
+                    logger.debug("write http " + content_file) #@UndefinedVariable
+                    logger.debug("write http " + repr(body)) #@UndefinedVariable
                     h = httplib2.Http()
                     resp, content = h.request(content_file, "PUT", headers={'content-type':'application/json'}, body=body)
-                    logging.debug("write http " + repr(resp) + " content " + content) #@UndefinedVariable
+                    logger.debug("write http " + repr(resp) + " content " + content) #@UndefinedVariable
                 else:
                     if content_file and os.path.exists(content_file):
                         dest_file_name = content_file 
                     else:
                         dest_file_name = options.filename
             
-                    logging.debug("WRITE : " + dest_file_name) #@UndefinedVariable
+                    logger.debug("WRITE : " + dest_file_name) #@UndefinedVariable
                     output = open(dest_file_name, "w")
                     output.write(output_data)
                     output.flush()
--- a/script/lib/iri_tweet/models.py	Wed Jul 27 18:32:56 2011 +0200
+++ b/script/lib/iri_tweet/models.py	Tue Aug 09 13:07:23 2011 +0200
@@ -42,7 +42,34 @@
             if hasattr(self,key):
                 setattr(self,key,value)
 
+class TweetSource(Base):
+    __tablename__ = 'tweet_tweet_source'
+    id = Column(Integer, primary_key = True, autoincrement=True)
+    original_json = Column(String)
+    received_at = Column(DateTime, default=datetime.datetime.now())
+    
+    def __init__(self, **kwargs):
+        for key, value in kwargs.items():
+            if hasattr(self,key):
+                setattr(self,key,value)
 
+
+class TweetLog(Base):
+    
+    TWEET_STATUS = {
+        'OK' : 1,
+        'ERROR' : 2,
+    }
+    
+    __tablename__ = 'tweet_tweet_log'
+    id = Column(Integer, primary_key = True, autoincrement=True)
+    tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
+    tweet_source = relationship("TweetSource", backref="logs")
+    status = Column(Integer)
+    error = Column(String)
+    error_stack = Column(String)
+ 
+    
 class Tweet(Base):
     __tablename__ = 'tweet_tweet'
 
@@ -65,12 +92,12 @@
     text = Column(String)
     truncated = Column(Boolean)
     user_id = Column(Integer, ForeignKey('tweet_user.id'))
-    original_json = Column(String)
+    user = relationship("User", backref="tweets")
+    tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
+    tweet_source = relationship("TweetSource", backref="tweet")
     entity_list = relationship(Entity, backref='tweet')
     received_at = Column(DateTime, default=datetime.datetime.now())
-    
-    #user = relationship(User, primaryjoin=user_id == User.id)
-    
+        
     def __init__(self, **kwargs):
         for key, value in kwargs.items():
             if hasattr(self,key):
@@ -81,11 +108,11 @@
 
     id = Column(Integer, primary_key = True)
     user_id = Column(Integer, ForeignKey('tweet_user.id'))
+    user = relationship("User", backref="messages")
     created_at = Column(DateTime, default=datetime.datetime.now())
     message_id = Column(Integer, ForeignKey('tweet_message.id'))
 
 class Message(Base):
-    
     __tablename__ = "tweet_message"
     
     id = Column(Integer, primary_key = True)
@@ -131,8 +158,6 @@
     url= Column(String)
     utc_offset = Column(Integer)
     verified= Column(Boolean)
-    tweets = relationship(Tweet, backref='user')
-    messages  = relationship(UserMessage, backref='user')
 
     def __init__(self, **kwargs):
         for key, value in kwargs.items():
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/iri_tweet/tests.py	Tue Aug 09 13:07:23 2011 +0200
@@ -0,0 +1,148 @@
+from sqlalchemy import Column, Integer, String, ForeignKey, create_engine
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import relationship, backref
+import unittest #@UnresolvedImport
+from sqlalchemy.orm import sessionmaker
+from iri_tweet.utils import ObjectsBuffer, TwitterProcessor
+from iri_tweet import models
+import tempfile #@UnresolvedImport
+import os
+
+Base = declarative_base()
+
+class User(Base):
+    __tablename__ = 'users'
+    
+    id = Column(Integer, primary_key=True)
+    name = Column(String)
+    fullname = Column(String)
+    password = Column(String)
+    
+    def __init__(self, name, fullname, password):
+        self.name = name
+        self.fullname = fullname
+        self.password = password
+    
+    def __repr__(self):
+        return "<User('%s','%s', '%s')>" % (self.name, self.fullname, self.password)
+
+
+class Address(Base):
+    __tablename__ = 'addresses'
+    id = Column(Integer, primary_key=True)
+    email_address = Column(String, nullable=False)
+    user_id = Column(Integer, ForeignKey('users.id'))
+    
+    user = relationship("User", backref=backref('addresses', order_by=id))
+    
+    def __init__(self, user_id, email_address):
+        self.email_address = email_address
+        self.user_id = user_id
+    
+    def __repr__(self):
+        return "<Address('%s')>" % self.email_address
+
+
+
+class TestObjectBuffer(unittest.TestCase):
+    
+    def setUp(self):
+        self.engine = create_engine('sqlite:///:memory:', echo=False)
+        Base.metadata.create_all(self.engine)
+        sessionMaker = sessionmaker(bind=self.engine)
+        self.session = sessionMaker()
+
+    def tearDown(self):
+        self.session.close()
+        self.engine.dispose()
+
+        
+    def testCreateUser(self):
+        ed_user = User('ed', 'Ed Jones', 'edspassword')
+        self.session.add(ed_user)
+        self.assertTrue(ed_user.id is None)
+        self.session.commit()
+        self.assertTrue(ed_user.id is not None)
+
+        
+    def testSimpleBuffer(self):
+        obj_buffer = ObjectsBuffer()
+        obj_proxy = obj_buffer.add_object(User, ['ed1', 'Ed1 Jones', 'edspassword'], None, False)
+        self.assertTrue(obj_proxy.id() is None)
+        obj_buffer.persists(self.session)
+        self.assertTrue(obj_proxy.id() is None)
+        self.session.commit()
+        self.assertTrue(obj_proxy.id() is not None)
+
+
+    def testSimpleBufferKwargs(self):
+        obj_buffer = ObjectsBuffer()
+        obj_proxy = obj_buffer.add_object(User, None, {'name':'ed1b', 'fullname':'Ed1b Jones', 'password':'edspassword'}, False)
+        self.assertTrue(obj_proxy.id() is None)
+        obj_buffer.persists(self.session)
+        self.assertTrue(obj_proxy.id() is None)
+        self.session.commit()
+        self.assertTrue(obj_proxy.id() is not None)
+
+        
+    def testSimpleBufferFlush(self):
+        obj_buffer = ObjectsBuffer()
+        obj_proxy = obj_buffer.add_object(User, ['ed2', 'Ed2 Jones', 'edspassword'], None, True)
+        self.assertTrue(obj_proxy.id() is None)
+        obj_buffer.persists(self.session)
+        self.assertTrue(obj_proxy.id() is not None)
+        self.session.commit()
+        self.assertTrue(obj_proxy.id() is not None)
+        
+    def testRelationBuffer(self):
+        obj_buffer = ObjectsBuffer()
+        user1_proxy = obj_buffer.add_object(User, ['ed3', 'Ed3 Jones', 'edspassword'], None, True)
+        obj_buffer.add_object(Address, [user1_proxy.id,'ed3@mail.com'], None, False)
+        obj_buffer.add_object(Address, [user1_proxy.id,'ed3@other.com'], None, False)
+        user2_proxy = obj_buffer.add_object(User, ['ed4', 'Ed3 Jones', 'edspassword'], None, True)
+        obj_buffer.add_object(Address, [user2_proxy.id,'ed4@mail.com'], None, False)
+        obj_buffer.persists(self.session)
+        self.session.commit()
+        ed_user = self.session.query(User).filter_by(name='ed3').first()
+        self.assertEquals(2, len(ed_user.addresses))
+        ed_user = self.session.query(User).filter_by(name='ed4').first()
+        self.assertEquals(1, len(ed_user.addresses))
+
+        
+
+original_json = u'{"in_reply_to_user_id_str":null,"text":"RT @BieberEagle: \\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweeted_status":{"in_reply_to_user_id_str":null,"text":"\\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweet_count":"100+","source":"web","entities":{"user_mentions":[],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96638597737889792","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/298443445\/355584171.jpg","listed_count":5040,"friends_count":8477,"profile_link_color":"ff0000","profile_sidebar_border_color":"000000","url":"http:\/\/twitpic.com\/photos\/BieberEagle","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","profile_image_url":"http:\/\/a2.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","description":"1 name, 1 inspiration, 1 hero, 1 smile, 1 singer, 1 boy who changed my life.    B.\u0130.E.B.E.R-Believe In Everything Because Everything\'s Reachable. #NEVERSAYNEVER","default_profile":false,"notifications":null,"time_zone":"Paris","followers_count":14506,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"BieberEagle","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ffffff","location":"\u2665 Albania \u2665 ","id_str":"229067923","profile_background_image_url":"http:\/\/a2.twimg.com\/profile_background_images\/298443445\/355584171.jpg","favourites_count":89,"protected":false,"follow_request_sent":null,"following":null,"name":"truebelieber","statuses_count":24279,"verified":false,"created_at":"Tue Dec 21 12:35:18 +0000 2010","profile_text_color":"000000","id":229067923,"contributors_enabled":false,"utc_offset":3600,"profile_sidebar_fill_color":""},"id":96638597737889792,"created_at":"Thu Jul 28 17:50:11 +0000 2011","geo":null,"in_reply_to_screen_name":null},"retweet_count":"100+","source":"web","entities":{"user_mentions":[{"indices":[3,15],"screen_name":"BieberEagle","id_str":"229067923","name":"truebelieber","id":229067923}],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96965037637382145","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","listed_count":3,"friends_count":1150,"profile_link_color":"00cccc","profile_sidebar_border_color":"c8ff00","url":"http:\/\/www.facebook.com\/blovedbecca180","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","profile_image_url":"http:\/\/a3.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","description":"if ya wanna know something about me, then get to know me. \\n\\r\\n\\ri promise, you wont regret it. (:\\r\\ni love justin bieber with an extreme burning passion!","default_profile":false,"notifications":null,"time_zone":"Central Time (US & Canada)","followers_count":361,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"beccaxannxx","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ff0066","location":"","id_str":"65624607","profile_background_image_url":"http:\/\/a3.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","favourites_count":266,"protected":false,"follow_request_sent":null,"following":null,"name":"beccaxannxx","statuses_count":2512,"verified":false,"created_at":"Fri Aug 14 12:36:35 +0000 2009","profile_text_color":"6a39d4","id":65624607,"contributors_enabled":false,"utc_offset":-21600,"profile_sidebar_fill_color":"ff00bb"},"id":96965037637382145,"created_at":"Fri Jul 29 15:27:21 +0000 2011","geo":null,"in_reply_to_screen_name":null}'
+
+
+class TestTwitterProcessor(unittest.TestCase):
+    
+    def setUp(self):
+        self.engine, self.metadata = models.setup_database('sqlite:///:memory:', echo=True)
+        sessionMaker = sessionmaker(bind=self.engine)
+        self.session = sessionMaker()
+        file, self.tmpfilepath = tempfile.mkstemp()
+        os.close(file)
+
+
+    def testTwitterProcessor(self):
+        tp = TwitterProcessor(None, original_json, None, self.session, self.tmpfilepath)
+        tp.process()
+        self.session.commit()
+        
+        self.assertEquals(1, self.session.query(models.TweetSource).count())
+        self.assertEquals(1, self.session.query(models.Tweet).count())
+        self.assertEquals(2, self.session.query(models.User).count())
+        tweet = self.session.query(models.Tweet).first()
+        self.assertFalse(tweet.user is None)
+        self.assertEqual(u"beccaxannxx",tweet.user.name)
+        self.assertEqual(65624607,tweet.user.id)
+        self.assertEqual(1,len(tweet.entity_list))
+        self.assertEqual(u"BieberEagle", tweet.entity_list[0].user.screen_name)
+
+
+    def tearDown(self):
+        self.session.close()
+        self.engine.dispose()
+        os.remove(self.tmpfilepath)
+
+if __name__ == '__main__':
+    unittest.main()
\ No newline at end of file
--- a/script/lib/iri_tweet/tweet_twitter_user.py	Wed Jul 27 18:32:56 2011 +0200
+++ b/script/lib/iri_tweet/tweet_twitter_user.py	Tue Aug 09 13:07:23 2011 +0200
@@ -1,13 +1,12 @@
 from iri_tweet.models import setup_database, Message, UserMessage, User
 from iri_tweet.utils import (get_oauth_token, get_user_query, set_logging_options, 
-    set_logging, parse_date)
+    set_logging, parse_date, logger)
 from optparse import OptionParser #@UnresolvedImport
 from sqlalchemy import BigInteger
 from sqlalchemy.orm import sessionmaker
 from sqlalchemy.schema import MetaData, Table, Column
 from sqlalchemy.sql import and_
 import datetime
-import logging #@UnresolvedImport
 import sys
 import time
 import twitter
@@ -54,7 +53,7 @@
     
     set_logging(options)
         
-    logging.debug("OPTIONS : " + repr(options)) #@UndefinedVariable
+    logger.debug("OPTIONS : " + repr(options)) #@UndefinedVariable
 
     if not options.message or len(options.message) == 0:
         sys.exit()
@@ -108,7 +107,7 @@
                 screen_name = user.screen_name
                 
                 message = u"@%s: %s" % (screen_name, base_message)
-                logging.debug("new status : " + message) #@UndefinedVariable
+                logger.debug("new status : " + message) #@UndefinedVariable
                 if not options.simulate:
                     t.statuses.update(status=message)
                     user_message = UserMessage(user_id=user.id, message_id=message_obj.id)
--- a/script/lib/iri_tweet/utils.py	Wed Jul 27 18:32:56 2011 +0200
+++ b/script/lib/iri_tweet/utils.py	Tue Aug 09 13:07:23 2011 +0200
@@ -1,6 +1,6 @@
-from models import Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, \
-    EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, \
-    ACCESS_TOKEN_SECRET, adapt_date, adapt_json
+from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, 
+    EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, 
+    ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog)
 from sqlalchemy.sql import select, or_ #@UnresolvedImport
 import anyjson #@UnresolvedImport
 import datetime
@@ -77,13 +77,67 @@
     return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])    
 
 
+class ObjectBufferProxy(object):
+    def __init__(self, klass, args, kwargs, must_flush, instance=None):
+        self.klass= klass
+        self.args = args
+        self.kwargs = kwargs
+        self.must_flush = must_flush
+        self.instance = instance
+        
+    def persists(self, session):
+        new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
+        new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {}
+        
+        self.instance = self.klass(*new_args, **new_kwargs)
+        session.add(self.instance)
+        if self.must_flush:
+            session.flush()
+            
+    def __getattr__(self, name):
+        return lambda : getattr(self.instance, name) if self.instance else None
+        
+        
+    
+
+class ObjectsBuffer(object):
+
+    def __init__(self):
+        self.__bufferlist = []
+        
+    def persists(self, session):
+        for object_proxy in self.__bufferlist:
+            object_proxy.persists(session)
+            
+    def add_object(self, klass, args, kwargs, must_flush):
+        new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush)
+        self.__bufferlist.append(new_proxy)
+        return new_proxy 
+    
+    def get(self, klass, **kwargs):
+        for proxy in self.__bufferlist:
+            if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass:
+                continue
+            found = True
+            for k,v in kwargs.items():
+                if (k not in proxy.kwargs) or v != proxy.kwargs[k]:
+                    found = False
+                    break
+            if found:
+                return proxy
+        
+        return None
+                
+                    
+        
+
 
 class TwitterProcessorException(Exception):
     pass
 
 class TwitterProcessor(object):
     
-    def __init__(self, json_dict, json_txt, session, token_filename=None):
+    def __init__(self, json_dict, json_txt, source_id, session, token_filename=None):
 
         if json_dict is None and json_txt is None:
             raise TwitterProcessorException("No json")
@@ -101,24 +155,39 @@
         if "id" not in self.json_dict:
             raise TwitterProcessorException("No id in json")
         
+        self.source_id = source_id
         self.session = session
         self.token_filename = token_filename
+        self.obj_buffer = ObjectsBuffer()
+
 
     def __get_user(self, user_dict):
-        logging.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
+        logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
     
         user_id = user_dict.get("id",None)    
         user_name = user_dict.get("screen_name", user_dict.get("name", None))
         
         if user_id is None and user_name is None:
             return None
-    
+
+        user = None
         if user_id:
-            user = self.session.query(User).filter(User.id == user_id).first()
+            user = self.obj_buffer.get(User, id=user_id)
         else:
-            user = self.session.query(User).filter(User.screen_name.ilike(user_name)).first()
+            user = self.obj_buffer.get(User, screen_name=user_name)
+            
+        if user is not None:
+            return user
+
+        #todo : add methpds to objectbuffer to get buffer user
+        user_obj = None
+        if user_id:
+            user_obj = self.session.query(User).filter(User.id == user_id).first()
+        else:
+            user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first()
     
-        if user is not None:
+        if user_obj is not None:
+            user = ObjectBufferProxy(User, None, None, False, user_obj)
             return user
     
         user_created_at = user_dict.get("created_at", None)
@@ -132,28 +201,27 @@
                 else:
                     user_dict = t.users.show(screen_name=user_name)            
             except Exception as e:
-                logging.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
-                logging.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
+                logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
+                logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
                 return None
     
         user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
         if "id" not in user_dict:
             return None
         
+        #TODO filter get, wrap in proxy
         user = self.session.query(User).filter(User.id == user_dict["id"]).first()
         
         if user is not None:
             return user
         
-        user = User(**user_dict)
+        user = self.obj_buffer.add_object(User, None, user_dict, True)
         
-        self.session.add(user)
-        self.session.flush()
-        
-        return user 
+        return user
+
 
     def __process_entity(self, ind, ind_type):
-        logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
+        logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
         
         ind = clean_keys(ind)
         
@@ -161,57 +229,53 @@
            "indice_start": ind["indices"][0],
            "indice_end"  : ind["indices"][1],
            "tweet_id"    : self.tweet.id,
-           "tweet"       : self.tweet
         }
     
         def process_hashtags():
             text = ind.get("text", ind.get("hashtag", None))
             if text is None:
-                return None 
-            hashtag = self.session.query(Hashtag).filter(Hashtag.text.ilike(text)).first()
+                return None
+            hashtag = self.obj_buffer.get(Hashtag, text=text)
+            if hashtag is None: 
+                hashtag_obj = self.session.query(Hashtag).filter(Hashtag.text.ilike(text)).first()
+                if hashtag_obj is not None:
+                    hashtag = ObjectBufferProxy(Hashtag, None, None, False, hashtag_obj)
+                    
             if hashtag is None:
                 ind["text"] = text
-                hashtag = Hashtag(**ind)
-                self.session.add(hashtag)
-                self.session.flush()
-            entity_dict['hashtag'] = hashtag
+                hashtag = self.obj_buffer.add_object(Hashtag, None, ind, True)
             entity_dict['hashtag_id'] = hashtag.id
-            entity = EntityHashtag(**entity_dict)
-            return entity
+            return EntityHashtag, entity_dict             
         
         def process_user_mentions():
             user_mention = self.__get_user(ind)
             if user_mention is None:
-                entity_dict['user'] = None
                 entity_dict['user_id'] = None
             else:
-                entity_dict['user'] = user_mention
                 entity_dict['user_id'] = user_mention.id
-            entity = EntityUser(**entity_dict)
-            return entity
+            return EntityUser, entity_dict
         
         def process_urls():
-            url = self.session.query(Url).filter(Url.url == ind["url"]).first()
+            url = self.obj_buffer.get(Url, url=ind["url"])
             if url is None:
-                url = Url(**ind)
-                self.session.add(url)
-                self.session.flush()
-            entity_dict['url'] = url
+                url_obj = self.session.query(Url).filter(Url.url == ind["url"]).first()
+                if url_obj is not None:
+                    url = ObjectBufferProxy(Url, None, None, False, url_obj)
+            if url is None:
+                url = self.obj_buffer.add_object(Url, None, ind, True)
             entity_dict['url_id'] = url.id
-            entity = EntityUrl(**entity_dict)
-            return entity
+            return EntityUrl, entity_dict
         
         #{'': lambda }
-        entity =  { 
+        entity_klass, entity_dict =  { 
             'hashtags': process_hashtags,
             'user_mentions' : process_user_mentions,
             'urls' : process_urls
             }[ind_type]()
             
-        logging.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
-        if entity:
-            self.session.add(entity)
-            self.session.flush()
+        logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
+        if entity_klass:
+            self.obj_buffer.add_object(entity_klass, None, entity_dict, False)
 
 
     def __process_twitter_stream(self):
@@ -225,16 +289,15 @@
         # get or create user
         user = self.__get_user(self.json_dict["user"])
         if user is None:
-            logging.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
-            ts_copy["user"] = None
+            logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
             ts_copy["user_id"] = None
         else:
-            ts_copy["user"] = user
-            ts_copy["user_id"] = ts_copy["user"].id
-        ts_copy["original_json"] = self.json_txt
+            ts_copy["user_id"] = user.id
+            
+        del(ts_copy['user'])
+        ts_copy["tweet_source_id"] = self.source_id
         
-        self.tweet = Tweet(**ts_copy)
-        self.session.add(self.tweet)
+        self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True)
             
         # get entities
         if "entities" in self.json_dict:
@@ -260,7 +323,8 @@
         tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
         if tweet_nb > 0:
             return
-            
+        
+        
         tweet_fields = {
             'created_at': self.json_dict["created_at"], 
             'favorited': False,
@@ -272,8 +336,8 @@
             #'place': ts["place"],
             'source': self.json_dict["source"],
             'text': self.json_dict["text"],
-            'truncated': False,
-            'original_json' : self.json_txt,
+            'truncated': False,            
+            'tweet_source_id' : self.source_id,
         }
         
         #user
@@ -286,16 +350,13 @@
         
         user = self.__get_user(user_fields)
         if user is None:
-            logging.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
-            tweet_fields["user"] = None
+            logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
             tweet_fields["user_id"] = None
         else:
-            tweet_fields["user"] = user
             tweet_fields["user_id"] = user.id
         
         tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
-        self.tweet = Tweet(**tweet_fields)
-        self.session.add(self.tweet)
+        self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True)
         
         text = self.tweet.text
         
@@ -303,26 +364,37 @@
         
         for ind in extractor.extract_hashtags_with_indices():
             self.__process_entity(ind, "hashtags")
-            
-        for ind in extractor.extract_mentioned_screen_names_with_indices():
-            self.__process_entity(ind, "user_mentions")
-        
+                    
         for ind in extractor.extract_urls_with_indices():
             self.__process_entity(ind, "urls")
         
-        self.session.flush()
+        for ind in extractor.extract_mentioned_screen_names_with_indices():
+            self.__process_entity(ind, "user_mentions")
+
 
 
     def process(self):
+        
+        if self.source_id is None:
+            tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True)
+            self.source_id = tweet_source.id
+        
         if "metadata" in self.json_dict:
             self.__process_twitter_rest()
         else:
             self.__process_twitter_stream()
+
+        self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, False)
         
+        self.obj_buffer.persists(self.session)
+
 
-def set_logging(options):
+def set_logging(options, plogger=None):
     
-    logging_config = {}
+    logging_config = {
+        "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s',
+        "level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable
+    }
     
     if options.logfile == "stdout":
         logging_config["stream"] = sys.stdout
@@ -330,9 +402,27 @@
         logging_config["stream"] = sys.stderr
     else:
         logging_config["filename"] = options.logfile
-        
-    logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) #@UndefinedVariable
-    logging.basicConfig(**logging_config) #@UndefinedVariable
+            
+    logger = plogger
+    if logger is None:
+        logger = logging.getLogger() #@UndefinedVariable
+    
+    if len(logger.handlers) == 0:    
+        filename = logging_config.get("filename")
+        if filename:
+            mode = logging_config.get("filemode", 'a')
+            hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable
+        else:
+            stream = logging_config.get("stream")
+            hdlr = logging.StreamHandler(stream) #@UndefinedVariable
+        fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable
+        dfs = logging_config.get("datefmt", None)
+        fmt = logging.Formatter(fs, dfs) #@UndefinedVariable
+        hdlr.setFormatter(fmt)
+        logger.addHandler(hdlr)
+        level = logging_config.get("level")
+        if level is not None:
+            logger.setLevel(level)
     
     options.debug = (options.verbose-options.quiet > 0)
 
@@ -387,4 +477,4 @@
     
     return query.distinct()
 
-    
+logger = logging.getLogger() #@UndefinedVariable
--- a/script/lib/tweetstream/setup.py	Wed Jul 27 18:32:56 2011 +0200
+++ b/script/lib/tweetstream/setup.py	Tue Aug 09 13:07:23 2011 +0200
@@ -1,3 +1,4 @@
+#@PydevCodeAnalysisIgnore
 from setuptools import setup, find_packages
 import sys, os
 
--- a/script/lib/tweetstream/tweetstream/streamclasses.py	Wed Jul 27 18:32:56 2011 +0200
+++ b/script/lib/tweetstream/tweetstream/streamclasses.py	Tue Aug 09 13:07:23 2011 +0200
@@ -54,7 +54,7 @@
         :attr: `USER_AGENT`.
     """
 
-    def __init__(self, auth, catchup=None, url=None):
+    def __init__(self, auth, catchup=None, url=None, as_text=False):
         self._conn = None
         self._rate_ts = None
         self._rate_cnt = 0
@@ -68,6 +68,7 @@
         self.rate = 0
         self.user_agent = USER_AGENT
         if url: self.url = url
+        self._as_text = as_text
         
         self.muststop = False
 
@@ -119,12 +120,18 @@
         this method and return post data. The data should be in the format
         returned by urllib.urlencode."""
         return None
+    
+    def __muststop(self):
+        if callable(self.muststop):
+            return self.muststop()
+        else:
+            return self.muststop
 
     def next(self):
         """Return the next available tweet. This call is blocking!"""
         while True:
             try:
-                if self.muststop:
+                if self.__muststop():
                     raise StopIteration()
                 
                 if not self.connected:
@@ -143,10 +150,15 @@
                 elif data.isspace():
                     continue
 
-                data = anyjson.deserialize(data)
-                if 'text' in data:
+                if not self._as_text: 
+                    data = anyjson.deserialize(data)
+                    if 'text' in data:
+                        self.count += 1
+                        self._rate_cnt += 1
+                else: # count and rate may be off, but we count everything
                     self.count += 1
                     self._rate_cnt += 1
+                    
                 return data
 
             except ValueError, e:
@@ -175,12 +187,12 @@
     url = "http://stream.twitter.com/1/statuses/filter.json"
 
     def __init__(self, auth, follow=None, locations=None,
-                 track=None, catchup=None, url=None):
+                 track=None, catchup=None, url=None, as_text=False):
         self._follow = follow
         self._locations = locations
         self._track = track
         # remove follow, locations, track
-        BaseStream.__init__(self, auth, url=url)
+        BaseStream.__init__(self, auth, url=url, as_text=as_text)
 
     def _get_post_data(self):
         postdata = {}
--- a/script/rest/search_twitter.py	Wed Jul 27 18:32:56 2011 +0200
+++ b/script/rest/search_twitter.py	Tue Aug 09 13:07:23 2011 +0200
@@ -49,12 +49,12 @@
         page = 1
         
         while page <= int(1500/int(options.rpp)) and  ( results is None  or len(results) > 0):
-            results = twitter. search(q=options.query, rpp=options.rpp, page=page)
+            results = twitter.search(q=options.query, rpp=options.rpp, page=page)
             for tweet in results["results"]:
                 print tweet
                 tweet_str = anyjson.serialize(tweet)
                 #invalidate user id
-                processor = utils.TwitterProcessor(tweet, tweet_str, session, options.token_filename)
+                processor = utils.TwitterProcessor(tweet, tweet_str, None, session, options.token_filename)
                 processor.process()
                 session.flush()
                 session.commit()
--- a/script/stream/recorder_tweetstream.py	Wed Jul 27 18:32:56 2011 +0200
+++ b/script/stream/recorder_tweetstream.py	Tue Aug 09 13:07:23 2011 +0200
@@ -1,16 +1,24 @@
 from getpass import getpass
 from iri_tweet import models, utils
+from iri_tweet.models import TweetSource, TweetLog
+from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
 from optparse import OptionParser
 from sqlalchemy.orm import sessionmaker
-from sqlite3 import *
+import StringIO
+import logging
+import anyjson
 import datetime
-import logging
 import os
+import shutil
+import signal
 import socket
 import sys
 import time
+import traceback
+import tweepy.auth
 import tweetstream
-import tweepy.auth
+from iri_tweet.utils import logger
+from sqlalchemy.exc import OperationalError
 socket._fileobject.default_bufsize = 0
 
 
@@ -44,12 +52,12 @@
 
     """
 
-    def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, **kwargs):
+    def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs):
         self.max_reconnects = reconnects
         self.retry_wait = retry_wait
         self._reconnects = 0
         self._error_cb = error_cb
-        super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, **kwargs)
+        super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs)
 
     def next(self):
         while True:
@@ -72,45 +80,138 @@
 
 
 
-def process_tweet(tweet, session, debug, token_filename):
-    screen_name = ""
-    if 'user' in tweet and 'screen_name' in tweet['user']:
-        screen_name = tweet['user']['screen_name']
-    logging.info("Process_tweet from %s : %s" % (screen_name, tweet['text']))
-    logging.debug("Process_tweet :" + repr(tweet))
-    processor = utils.TwitterProcessor(tweet, None, session, token_filename)
-    processor.process()
-
-def main(username, password, track, session, debug, reconnects, token_filename, duration):
-
-    #username = username or raw_input('Twitter username: ')
-    #password = password or getpass('Twitter password: ')
-
-    track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
-    track_list = [k for k in track_list.split(',')]
+class SourceProcess(Process):
+    
+    def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
+        self.session_maker = session_maker
+        self.queue = queue
+        self.auth = auth
+        self.track = track
+        self.debug = debug
+        self.reconnects = reconnects
+        self.token_filename = token_filename
+        self.stop_event = stop_event
+        super(SourceProcess, self).__init__()
+#        self.stop_event = 
     
-    if username and password:
-        auth = tweepy.auth.BasicAuthHandler(username, password)        
-    else:
-        consumer_key = models.CONSUMER_KEY
-        consumer_secret = models.CONSUMER_SECRET
-        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
-        auth.set_access_token(*(utils.get_oauth_token(token_filename)))
-    
-    if duration >= 0:
-        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration)
+    def run(self):
+        
+        get_logger().debug("SourceProcess : run")
+        track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
+        track_list = [k for k in track_list.split(',')]
+
+        get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))                        
+        stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
+        get_logger().debug("SourceProcess : after connecting to stream")
+        stream.muststop = lambda: self.stop_event.is_set()
+        
+        session = self.session_maker()
+        
+        try:
+            for tweet in stream:
+                get_logger().debug("tweet " + repr(tweet))
+                source = TweetSource(original_json=tweet)
+                get_logger().debug("source created")
+                add_retries = 0
+                while add_retries < 10:
+                    try:
+                        add_retries += 1
+                        session.add(source)
+                        session.flush()
+                        break
+                    except OperationalError as e:
+                        session.rollback()
+                        get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
+                        if add_retries==10:
+                            raise e
+                     
+                source_id = source.id
+                get_logger().debug("before queue + source id " + repr(source_id))
+                self.queue.put((source_id, tweet), False)
+                #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
+                get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+                session.commit()
+#                if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
+#                    print "Stop recording after %d seconds." % (duration)
+#                    break
+        except Exception as e:
+            get_logger().error("Error when processing tweet " + repr(e))
+        finally:
+            session.rollback()
+            stream.close()
+            session.close()
+            self.queue.close()
+            self.stop_event.set()
+
+
+def process_tweet(tweet, source_id, session, token_filename):
+    try:
+        tweet_obj = anyjson.deserialize(tweet)
+        screen_name = ""
+        if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
+            screen_name = tweet_obj['user']['screen_name']
+        get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
+        get_logger().debug(u"Process_tweet :" + repr(tweet))
+        processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename)
+        processor.process()
+    except Exception as e:
+        message = u"Error %s processing tweet %s" % (repr(e), tweet)
+        get_logger().error(message)
+        output = StringIO.StringIO()
+        traceback.print_exception(Exception, e, None, None, output)
+        error_stack = output.getvalue()
+        output.close()
+        session.rollback()
+        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
+        session.add(tweet_log)
+        session.commit()
+
     
-    stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects)
-    try:
-        for tweet in stream:            
-            if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
-                print "Stop recording after %d seconds." % (duration)
-                break
-            process_tweet(tweet, session, debug, token_filename)
-            logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime)))
-            session.commit()
-    finally:
-        stream.close()
+        
+class TweetProcess(Process):
+    
+    def __init__(self, session_maker, queue, debug, token_filename, stop_event):
+        self.session_maker = session_maker
+        self.queue = queue
+        self.debug = debug
+        self.stop_event = stop_event
+        self.token_filename = token_filename
+        super(TweetProcess, self).__init__()
+
+
+    def run(self):
+        
+        session = self.session_maker()
+        try:
+            while not self.stop_event.is_set():
+                try:
+                    source_id, tweet_txt = queue.get(True, 10)
+                    get_logger().debug("Processing source id " + repr(source_id))
+                except Exception as e:
+                    get_logger().debug('Process tweet exception in loop : ' + repr(e))
+                    continue
+                process_tweet(tweet_txt, source_id, session, self.token_filename)
+                session.commit()
+        except:
+            raise
+        finally:
+            session.rollback()
+            self.stop_event.set()
+            session.close()
+            
+def process_leftovers(session, token_filename):
+    
+    sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
+    
+    for src in sources:
+        tweet_txt = src.original_json
+        process_tweet(tweet_txt, src.id, session, token_filename)
+
+        
+    
+    #get tweet source that do not match any message
+    #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
+
         
 def get_options():
     parser = OptionParser()
@@ -130,6 +231,9 @@
                       help="Token file name")
     parser.add_option("-d", "--duration", dest="duration",
                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
+    parser.add_option("-N", "--consumer", dest="consumer_nb",
+                      help="number of consumer", metavar="CONSUMER", default=1, type='int')
+
 
 
     utils.set_logging_options(parser)
@@ -139,27 +243,91 @@
 
 if __name__ == '__main__':
     
-
     (options, args) = get_options()
     
-    utils.set_logging(options)
+    utils.set_logging(options, get_logger())
         
     if options.debug:
         print "OPTIONS : "
         print repr(options)
     
     if options.new and os.path.exists(options.filename):
-        os.remove(options.filename)
+        i = 1
+        basename, extension = os.path.splitext(options.filename)
+        new_path = '%s.%d%s' % (basename, i, extension)
+        while i < 1000000 and os.path.exists(new_path):
+            i += 1
+            new_path = '%s.%d%s' % (basename, i, extension)
+        if i >= 1000000:
+            raise Exception("Unable to find new filename for " + options.filename)
+        else:
+            shutil.move(options.filename, new_path)
+
     
-    engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug>=2))
-    Session = sessionmaker(bind=engine)
-    session = Session()
+    queue = JoinableQueue()
+    stop_event = Event()
+
+    if options.username and options.password:
+        auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
+    else:
+        consumer_key = models.CONSUMER_KEY
+        consumer_secret = models.CONSUMER_SECRET
+        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
+        auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))
+
 
-    try:
-        try:
-            main(options.username, options.password, options.track, session, options.debug, options.reconnects, options.token_filename, options.duration)
-        except KeyboardInterrupt:
-            print '\nGoodbye!'
-        session.commit()
-    finally:
-        session.close()
+    engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
+    Session = sessionmaker(bind=engine)
+    
+    session = Session()
+    process_leftovers(session, options.token_filename)
+    session.commit()
+    session.close()
+         
+    sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)    
+    
+    tweet_processes = []
+    
+    for i in range(options.consumer_nb):
+        engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
+        Session = sessionmaker(bind=engine)
+        cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
+        tweet_processes.append(cprocess)
+
+    def interupt_handler(signum, frame):
+        stop_event.set()
+        
+    signal.signal(signal.SIGINT, interupt_handler)
+
+    sprocess.start()
+    for cprocess in tweet_processes:
+        cprocess.start()
+
+    if options.duration >= 0:
+        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
+
+    while not stop_event.is_set():
+        if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
+            stop_event.set()
+            break
+        if sprocess.is_alive():
+            time.sleep(0.1)
+        else:
+            break
+    
+    get_logger().debug("Joining Source Process")
+    sprocess.join()
+    get_logger().debug("Joining Queue")
+    #queue.join()
+    for i,cprocess in enumerate(tweet_processes):
+        get_logger().debug("Joining consumer process Nb %d" % (i+1))
+        cprocess.join()
+    
+    get_logger().debug("Processing leftovers")
+    session = Session()
+    process_leftovers(session, options.token_filename)
+    session.commit()
+    session.close()
+
+    get_logger().debug("Done. Exiting.")
+        
\ No newline at end of file