- debug multithread (still database lock problem)
authorYves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Mon, 08 Aug 2011 09:01:40 +0200
changeset 243 9213a63fa34a
parent 242 cdd7d3c0549c
child 244 d4b7d6e2633f
- debug multithread (still database lock problem) - add object buffer - add test - try to improve logging
script/lib/iri_tweet/export_tweet_db.py
script/lib/iri_tweet/export_twitter_alchemy.py
script/lib/iri_tweet/models.py
script/lib/iri_tweet/tests.py
script/lib/iri_tweet/tweet_twitter_user.py
script/lib/iri_tweet/utils.py
script/stream/recorder_tweetstream.py
--- a/script/lib/iri_tweet/export_tweet_db.py	Wed Jul 27 00:04:55 2011 +0200
+++ b/script/lib/iri_tweet/export_tweet_db.py	Mon Aug 08 09:01:40 2011 +0200
@@ -1,8 +1,7 @@
 from models import setup_database
 from optparse import OptionParser #@UnresolvedImport
 from sqlalchemy.orm import sessionmaker
-from utils import set_logging_options, set_logging, TwitterProcessor
-import logging
+from utils import set_logging_options, set_logging, TwitterProcessor, logger
 import sqlite3 #@UnresolvedImport
 
 
@@ -34,11 +33,11 @@
             curs_in = conn_in.cursor()
             fields_mapping = {}
             for i,res in enumerate(curs_in.execute("select json from tweet_tweet;")):
-                logging.debug("main loop %d : %s" % (i, res[0])) #@UndefinedVariable
+                logger.debug("main loop %d : %s" % (i, res[0])) #@UndefinedVariable
                 processor = TwitterProcessor(eval(res[0]), res[0], None, session, options.token_filename)
                 processor.process()
                 session.commit()
-            logging.debug("main : %d tweet processed" % (i+1)) #@UndefinedVariable
+            logger.debug("main : %d tweet processed" % (i+1)) #@UndefinedVariable
         except Exception, e:
             session.rollback()
             raise e
--- a/script/lib/iri_tweet/export_twitter_alchemy.py	Wed Jul 27 00:04:55 2011 +0200
+++ b/script/lib/iri_tweet/export_twitter_alchemy.py	Mon Aug 08 09:01:40 2011 +0200
@@ -5,10 +5,9 @@
 from optparse import OptionParser #@UnresolvedImport
 from sqlalchemy import Table, Column, BigInteger, MetaData
 from sqlalchemy.orm import sessionmaker
-from utils import parse_date, set_logging_options, set_logging, get_filter_query
+from utils import parse_date, set_logging_options, set_logging, get_filter_query, logger
 from models import setup_database
 import datetime
-import logging
 import os.path
 import re
 import sys
@@ -101,7 +100,7 @@
     
     set_logging(options)
         
-    logging.debug("OPTIONS : " + repr(options)) #@UndefinedVariable
+    logger.debug("OPTIONS : " + repr(options)) #@UndefinedVariable
     
     if len(sys.argv) == 1 or options.database is None:
         parser.print_help()
@@ -159,7 +158,7 @@
             
             for params in parameters:
                 
-                logging.debug("PARAMETERS " + repr(params)) #@UndefinedVariable
+                logger.debug("PARAMETERS " + repr(params)) #@UndefinedVariable
                 
                 start_date_str = params.get("start_date",None)
                 end_date_str = params.get("end_date", None)
@@ -192,12 +191,12 @@
                 
                 if content_file and content_file.find("http") == 0:
                     
-                    logging.debug("url : " + content_file) #@UndefinedVariable
+                    logger.debug("url : " + content_file) #@UndefinedVariable
                     
                     h = httplib2.Http()
                     resp, content = h.request(content_file)
                     
-                    logging.debug("url response " + repr(resp) + " content " + repr(content)) #@UndefinedVariable
+                    logger.debug("url response " + repr(resp) + " content " + repr(content)) #@UndefinedVariable
                     
                     project = anyjson.deserialize(content)
                     root = etree.fromstring(project["ldt"])
@@ -254,7 +253,7 @@
                     
                     
                 if ensemble_parent is None:
-                    logging.error("Can not process file") #@UndefinedVariable
+                    logger.error("Can not process file") #@UndefinedVariable
                     sys.exit()
             
                 if options.replace:
@@ -309,18 +308,18 @@
                     
                     project["ldt"] = output_data
                     body = anyjson.serialize(project)
-                    logging.debug("write http " + content_file) #@UndefinedVariable
-                    logging.debug("write http " + repr(body)) #@UndefinedVariable
+                    logger.debug("write http " + content_file) #@UndefinedVariable
+                    logger.debug("write http " + repr(body)) #@UndefinedVariable
                     h = httplib2.Http()
                     resp, content = h.request(content_file, "PUT", headers={'content-type':'application/json'}, body=body)
-                    logging.debug("write http " + repr(resp) + " content " + content) #@UndefinedVariable
+                    logger.debug("write http " + repr(resp) + " content " + content) #@UndefinedVariable
                 else:
                     if content_file and os.path.exists(content_file):
                         dest_file_name = content_file 
                     else:
                         dest_file_name = options.filename
             
-                    logging.debug("WRITE : " + dest_file_name) #@UndefinedVariable
+                    logger.debug("WRITE : " + dest_file_name) #@UndefinedVariable
                     output = open(dest_file_name, "w")
                     output.write(output_data)
                     output.flush()
--- a/script/lib/iri_tweet/models.py	Wed Jul 27 00:04:55 2011 +0200
+++ b/script/lib/iri_tweet/models.py	Mon Aug 08 09:01:40 2011 +0200
@@ -92,15 +92,12 @@
     text = Column(String)
     truncated = Column(Boolean)
     user_id = Column(Integer, ForeignKey('tweet_user.id'))
-    user = relationship("TweetUser", backref="tweets")
-#    original_json = Column(String)
+    user = relationship("User", backref="tweets")
     tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
     tweet_source = relationship("TweetSource", backref="tweet")
     entity_list = relationship(Entity, backref='tweet')
     received_at = Column(DateTime, default=datetime.datetime.now())
-    
-    #user = relationship(User, primaryjoin=user_id == User.id)
-    
+        
     def __init__(self, **kwargs):
         for key, value in kwargs.items():
             if hasattr(self,key):
@@ -111,7 +108,7 @@
 
     id = Column(Integer, primary_key = True)
     user_id = Column(Integer, ForeignKey('tweet_user.id'))
-    user = relationship("TweetUser", backref="messages")
+    user = relationship("User", backref="messages")
     created_at = Column(DateTime, default=datetime.datetime.now())
     message_id = Column(Integer, ForeignKey('tweet_message.id'))
 
@@ -161,8 +158,8 @@
     url= Column(String)
     utc_offset = Column(Integer)
     verified= Column(Boolean)
-    tweets = relationship(Tweet, backref='user')
-    messages  = relationship(UserMessage, backref='user')
+    #tweets = relationship(Tweet, backref='user')
+    #messages  = relationship(UserMessage, backref='user')
 
     def __init__(self, **kwargs):
         for key, value in kwargs.items():
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/iri_tweet/tests.py	Mon Aug 08 09:01:40 2011 +0200
@@ -0,0 +1,97 @@
+from sqlalchemy import Column, Integer, String, ForeignKey, create_engine
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import relationship, backref
+import unittest #@UnresolvedImport
+from sqlalchemy.orm import sessionmaker
+from iri_tweet.utils import ObjectsBuffer
+
+Base = declarative_base()
+
+class User(Base):
+    __tablename__ = 'users'
+    
+    id = Column(Integer, primary_key=True)
+    name = Column(String)
+    fullname = Column(String)
+    password = Column(String)
+    
+    def __init__(self, name, fullname, password):
+        self.name = name
+        self.fullname = fullname
+        self.password = password
+    
+    def __repr__(self):
+        return "<User('%s','%s', '%s')>" % (self.name, self.fullname, self.password)
+
+
+class Address(Base):
+    __tablename__ = 'addresses'
+    id = Column(Integer, primary_key=True)
+    email_address = Column(String, nullable=False)
+    user_id = Column(Integer, ForeignKey('users.id'))
+    
+    user = relationship("User", backref=backref('addresses', order_by=id))
+    
+    def __init__(self, user_id, email_address):
+        self.email_address = email_address
+        self.user_id = user_id
+    
+    def __repr__(self):
+        return "<Address('%s')>" % self.email_address
+
+
+
+class TestObjectBuffer(unittest.TestCase):
+    
+    def setUp(self):
+        self.engine = create_engine('sqlite:///:memory:', echo=False)
+        Base.metadata.create_all(self.engine)
+        sessionMaker = sessionmaker(bind=self.engine)
+        self.session = sessionMaker()
+        
+    def testCreateUser(self):
+        ed_user = User('ed', 'Ed Jones', 'edspassword')
+        self.session.add(ed_user)
+        self.assertTrue(ed_user.id is None)
+        self.session.commit()
+        self.assertTrue(ed_user.id is not None)
+        
+    def testSimpleBuffer(self):
+        obj_buffer = ObjectsBuffer()
+        obj_proxy = obj_buffer.add_object(User, ['ed1', 'Ed1 Jones', 'edspassword'], None, False)
+        self.assertTrue(obj_proxy.id() is None)
+        obj_buffer.persists(self.session)
+        self.assertTrue(obj_proxy.id() is None)
+        self.session.commit()
+        self.assertTrue(obj_proxy.id() is not None)
+        
+    def testSimpleBufferFlush(self):
+        obj_buffer = ObjectsBuffer()
+        obj_proxy = obj_buffer.add_object(User, ['ed2', 'Ed2 Jones', 'edspassword'], None, True)
+        self.assertTrue(obj_proxy.id() is None)
+        obj_buffer.persists(self.session)
+        self.assertTrue(obj_proxy.id() is not None)
+        self.session.commit()
+        self.assertTrue(obj_proxy.id() is not None)
+        
+    def testRelationBuffer(self):
+        obj_buffer = ObjectsBuffer()
+        user1_proxy = obj_buffer.add_object(User, ['ed3', 'Ed3 Jones', 'edspassword'], None, True)
+        obj_buffer.add_object(Address, [user1_proxy.id,'ed3@mail.com'], None, False)
+        obj_buffer.add_object(Address, [user1_proxy.id,'ed3@other.com'], None, False)
+        user2_proxy = obj_buffer.add_object(User, ['ed4', 'Ed3 Jones', 'edspassword'], None, True)
+        obj_buffer.add_object(Address, [user2_proxy.id,'ed4@mail.com'], None, False)
+        obj_buffer.persists(self.session)
+        self.session.commit()
+        ed_user = self.session.query(User).filter_by(name='ed3').first()
+        self.assertEquals(2, len(ed_user.addresses))
+        ed_user = self.session.query(User).filter_by(name='ed4').first()
+        self.assertEquals(1, len(ed_user.addresses))
+
+        
+    def tearDown(self):
+        self.session.close()
+
+
+if __name__ == '__main__':
+    unittest.main()
\ No newline at end of file
--- a/script/lib/iri_tweet/tweet_twitter_user.py	Wed Jul 27 00:04:55 2011 +0200
+++ b/script/lib/iri_tweet/tweet_twitter_user.py	Mon Aug 08 09:01:40 2011 +0200
@@ -1,13 +1,12 @@
 from iri_tweet.models import setup_database, Message, UserMessage, User
 from iri_tweet.utils import (get_oauth_token, get_user_query, set_logging_options, 
-    set_logging, parse_date)
+    set_logging, parse_date, logger)
 from optparse import OptionParser #@UnresolvedImport
 from sqlalchemy import BigInteger
 from sqlalchemy.orm import sessionmaker
 from sqlalchemy.schema import MetaData, Table, Column
 from sqlalchemy.sql import and_
 import datetime
-import logging #@UnresolvedImport
 import sys
 import time
 import twitter
@@ -54,7 +53,7 @@
     
     set_logging(options)
         
-    logging.debug("OPTIONS : " + repr(options)) #@UndefinedVariable
+    logger.debug("OPTIONS : " + repr(options)) #@UndefinedVariable
 
     if not options.message or len(options.message) == 0:
         sys.exit()
@@ -108,7 +107,7 @@
                 screen_name = user.screen_name
                 
                 message = u"@%s: %s" % (screen_name, base_message)
-                logging.debug("new status : " + message) #@UndefinedVariable
+                logger.debug("new status : " + message) #@UndefinedVariable
                 if not options.simulate:
                     t.statuses.update(status=message)
                     user_message = UserMessage(user_id=user.id, message_id=message_obj.id)
--- a/script/lib/iri_tweet/utils.py	Wed Jul 27 00:04:55 2011 +0200
+++ b/script/lib/iri_tweet/utils.py	Mon Aug 08 09:01:40 2011 +0200
@@ -77,6 +77,45 @@
     return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])    
 
 
+class ObjectBufferProxy(object):
+    def __init__(self, klass, args, kwargs, must_flush):
+        self.klass= klass
+        self.args = args
+        self.kwargs = kwargs
+        self.must_flush = must_flush
+        self.instance = None
+        
+    def persists(self, session):
+        new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
+        new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {}
+        
+        self.instance = self.klass(*new_args, **new_kwargs)
+        session.add(self.instance)
+        if self.must_flush:
+            session.flush()
+            
+    def __getattr__(self, name):
+        return lambda : getattr(self.instance, name) if self.instance else None
+        
+        
+    
+
+class ObjectsBuffer(object):
+
+    def __init__(self):
+        self.__bufferlist = []
+        
+    def persists(self, session):
+        for object_proxy in self.__bufferlist:
+            object_proxy.persists(session)
+            
+    def add_object(self, klass, args, kwargs, must_flush):
+        new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush)
+        self.__bufferlist.append(new_proxy)
+        return new_proxy 
+    
+        
+
 
 class TwitterProcessorException(Exception):
     pass
@@ -104,9 +143,10 @@
         self.source_id = source_id
         self.session = session
         self.token_filename = token_filename
+        self.obj_buffer = ObjectsBuffer()
 
     def __get_user(self, user_dict):
-        logging.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
+        logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
     
         user_id = user_dict.get("id",None)    
         user_name = user_dict.get("screen_name", user_dict.get("name", None))
@@ -133,8 +173,8 @@
                 else:
                     user_dict = t.users.show(screen_name=user_name)            
             except Exception as e:
-                logging.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
-                logging.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
+                logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
+                logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
     
         user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
         if "id" not in user_dict:
@@ -148,7 +188,7 @@
         return user 
 
     def __process_entity(self, ind, ind_type):
-        logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
+        logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
         
         ind = clean_keys(ind)
         
@@ -203,7 +243,7 @@
             'urls' : process_urls
             }[ind_type]()
             
-        logging.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
+        logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
         if entity:
             self.session.add(entity)
             self.session.flush()
@@ -220,7 +260,7 @@
         # get or create user
         user = self.__get_user(self.json_dict["user"])
         if user is None:
-            logging.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
+            logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
             ts_copy["user"] = None
             ts_copy["user_id"] = None
         else:
@@ -270,7 +310,7 @@
         
         user = self.__get_user(user_fields)
         if user is None:
-            logging.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
+            logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
             tweet_fields["user"] = None
             tweet_fields["user_id"] = None
         else:
@@ -287,14 +327,13 @@
         
         for ind in extractor.extract_hashtags_with_indices():
             self.__process_entity(ind, "hashtags")
-            
-        for ind in extractor.extract_mentioned_screen_names_with_indices():
-            self.__process_entity(ind, "user_mentions")
-        
+                    
         for ind in extractor.extract_urls_with_indices():
             self.__process_entity(ind, "urls")
         
-        self.session.flush()
+        for ind in extractor.extract_mentioned_screen_names_with_indices():
+            self.__process_entity(ind, "user_mentions")
+
 
 
     def process(self):
@@ -305,21 +344,21 @@
             self.session.flush()
             self.source_id = tweet_source.id
         
-        try:
-            if "metadata" in self.json_dict:
-                self.__process_twitter_rest()
-            else:
-                self.__process_twitter_stream()
-                
-            tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK'])
-        except:
-            
-            raise
+        if "metadata" in self.json_dict:
+            self.__process_twitter_rest()
+        else:
+            self.__process_twitter_stream()
+
+        tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK'])            
+        self.session.add(tweet_log)
         
 
-def set_logging(options):
+def set_logging(options, plogger=None):
     
-    logging_config = {}
+    logging_config = {
+        "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s',
+        "level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable
+    }
     
     if options.logfile == "stdout":
         logging_config["stream"] = sys.stdout
@@ -327,9 +366,27 @@
         logging_config["stream"] = sys.stderr
     else:
         logging_config["filename"] = options.logfile
-        
-    logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) #@UndefinedVariable
-    logging.basicConfig(**logging_config) #@UndefinedVariable
+            
+    logger = plogger
+    if logger is None:
+        logger = logging.getLogger() #@UndefinedVariable
+    
+    if len(logger.handlers) == 0:    
+        filename = logging_config.get("filename")
+        if filename:
+            mode = logging_config.get("filemode", 'a')
+            hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable
+        else:
+            stream = logging_config.get("stream")
+            hdlr = logging.StreamHandler(stream) #@UndefinedVariable
+        fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable
+        dfs = logging_config.get("datefmt", None)
+        fmt = logging.Formatter(fs, dfs) #@UndefinedVariable
+        hdlr.setFormatter(fmt)
+        logger.addHandler(hdlr)
+        level = logging_config.get("level")
+        if level is not None:
+            logger.setLevel(level)
     
     options.debug = (options.verbose-options.quiet > 0)
 
@@ -384,4 +441,4 @@
     
     return query.distinct()
 
-    
+logger = logging.getLogger() #@UndefinedVariable
--- a/script/stream/recorder_tweetstream.py	Wed Jul 27 00:04:55 2011 +0200
+++ b/script/stream/recorder_tweetstream.py	Mon Aug 08 09:01:40 2011 +0200
@@ -1,14 +1,13 @@
 from getpass import getpass
 from iri_tweet import models, utils
 from iri_tweet.models import TweetSource, TweetLog
-from multiprocessing import Queue, JoinableQueue, Process, Event
+from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
 from optparse import OptionParser
 from sqlalchemy.orm import sessionmaker
-from sqlite3 import *
 import StringIO
+import logging
 import anyjson
 import datetime
-import logging
 import os
 import shutil
 import signal
@@ -18,6 +17,8 @@
 import traceback
 import tweepy.auth
 import tweetstream
+from iri_tweet.utils import logger
+from sqlalchemy.exc import OperationalError
 socket._fileobject.default_bufsize = 0
 
 
@@ -82,7 +83,6 @@
 class SourceProcess(Process):
     
     def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
-        super(SourceProcess, self).__init__()
         self.session_maker = session_maker
         self.queue = queue
         self.auth = auth
@@ -91,47 +91,93 @@
         self.reconnects = reconnects
         self.token_filename = token_filename
         self.stop_event = stop_event
+        super(SourceProcess, self).__init__()
 #        self.stop_event = 
     
     def run(self):
         
+        get_logger().debug("SourceProcess : run")
         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
         track_list = [k for k in track_list.split(',')]
-                        
+
+        get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))                        
         stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
+        get_logger().debug("SourceProcess : after connecting to stream")
         stream.muststop = lambda: self.stop_event.is_set()
         
         session = self.session_maker()
         
         try:
             for tweet in stream:
+                get_logger().debug("tweet " + repr(tweet))
                 source = TweetSource(original_json=tweet)
-                session.add(source)
-                session.flush()
+                get_logger().debug("source created")
+                add_retries = 0
+                while add_retries < 10:
+                    try:
+                        add_retries += 1
+                        session.add(source)
+                        session.flush()
+                        break
+                    except OperationalError as e:
+                        session.rollback()
+                        get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
+                        if add_retries==10:
+                            raise e
+                     
                 source_id = source.id
-                queue.put((source_id, tweet), False)
+                get_logger().debug("before queue + source id " + repr(source_id))
+                self.queue.put((source_id, tweet), False)
                 #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
-                logging.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+                get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
                 session.commit()
 #                if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
 #                    print "Stop recording after %d seconds." % (duration)
 #                    break
-        except:
+        except Exception as e:
+            get_logger().error("Error when processing tweet " + repr(e))
+        finally:
             session.rollback()
-        finally:
             stream.close()
             session.close()
-            
+            self.queue.close()
+            self.stop_event.set()
+
+
+def process_tweet(tweet, source_id, session, token_filename):
+    try:
+        tweet_obj = anyjson.deserialize(tweet)
+        screen_name = ""
+        if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
+            screen_name = tweet_obj['user']['screen_name']
+        get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
+        get_logger().debug(u"Process_tweet :" + repr(tweet))
+        processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename)
+        processor.process()
+    except Exception as e:
+        message = u"Error %s processing tweet %s" % (repr(e), tweet)
+        get_logger().error(message)
+        output = StringIO.StringIO()
+        traceback.print_exception(Exception, e, None, None, output)
+        error_stack = output.getvalue()
+        output.close()
+        session.rollback()
+        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
+        session.add(tweet_log)
+        session.commit()
+
+    
         
 class TweetProcess(Process):
     
     def __init__(self, session_maker, queue, debug, token_filename, stop_event):
-        super(TweetProcess, self).__init__()
         self.session_maker = session_maker
         self.queue = queue
         self.debug = debug
         self.stop_event = stop_event
         self.token_filename = token_filename
+        super(TweetProcess, self).__init__()
+
 
     def run(self):
         
@@ -139,74 +185,33 @@
         try:
             while not self.stop_event.is_set():
                 try:
-                    source_id, tweet_txt = queue.get(True, 30)
-                except:
+                    source_id, tweet_txt = queue.get(True, 10)
+                    get_logger().debug("Processing source id " + repr(source_id))
+                except Exception as e:
+                    get_logger().debug('Process tweet exception in loop : ' + repr(e))
                     continue
-                process_tweet(tweet_txt, source_id, session)
+                process_tweet(tweet_txt, source_id, session, self.token_filename)
                 session.commit()
-                self.queue.task_done()
         except:
-            session.rollback()
             raise
         finally:
+            session.rollback()
+            self.stop_event.set()
             session.close()
             
-        
-    def process_tweet(tweet, source_id, session):
-        
-        try:
-            tweet_obj = anyjson.deserialize(tweet)
-            screen_name = ""
-            if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
-                screen_name = tweet_obj['user']['screen_name']
-            logging.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
-            logging.debug(u"Process_tweet :" + repr(tweet))
-            processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, self.token_filename)
-            processor.process()
-        except Exception, e:
-            message = u"Error %e processing tweet %s" % (unicode(e), tweet)
-            logging.error(message)
-            output = StringIO.StringIO()
-            traceback.print_exception(Exception, e, None, None, output)
-            tweet_log = TweetLog(status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=output.getvalue())
-            output.close()
-
-
-
-#def main_source(username, password, track, session, debug, reconnects, token_filename, duration):
-
-    #username = username or raw_input('Twitter username: ')
-    #password = password or getpass('Twitter password: ')
+def process_leftovers(session, token_filename):
+    
+    sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
+    
+    for src in sources:
+        tweet_txt = src.original_json
+        process_tweet(tweet_txt, src.id, session, token_filename)
 
-#    track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
-#    track_list = [k for k in track_list.split(',')]
-    
-#    if username and password:
-#        auth = tweepy.auth.BasicAuthHandler(username, password)        
-#    else:
-#        consumer_key = models.CONSUMER_KEY
-#        consumer_secret = models.CONSUMER_SECRET
-#        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
-#        auth.set_access_token(*(utils.get_oauth_token(token_filename)))
-    
-#    if duration >= 0:
-#        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration)
+        
     
-#    stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects, as_text=True)
-#    try:
-#        for tweet in stream:
-#            source = TweetSource(original_json=tweet)
-#            session.add(source)
-#            session.flush()            
-#            source_id = source.id
-#            process_tweet(tweet, source_id, session, debug, token_filename)
-#            logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime)))
-#            session.commit()
-#            if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
-#                print "Stop recording after %d seconds." % (duration)
-#                break
-#    finally:
-#        stream.close()
+    #get tweet source that do not match any message
+    #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
+
         
 def get_options():
     parser = OptionParser()
@@ -240,7 +245,7 @@
     
     (options, args) = get_options()
     
-    utils.set_logging(options)
+    utils.set_logging(options, get_logger())
         
     if options.debug:
         print "OPTIONS : "
@@ -249,18 +254,16 @@
     if options.new and os.path.exists(options.filename):
         i = 1
         basename, extension = os.path.splitext(options.filename)
-        new_path = '$s.%d.%s' % (basename, i, extension)
+        new_path = '%s.%d%s' % (basename, i, extension)
         while i < 1000000 and os.path.exists(new_path):
             i += 1
-            new_path = '$s.%d.%s' % (basename, i, extension)
+            new_path = '%s.%d%s' % (basename, i, extension)
         if i >= 1000000:
             raise Exception("Unable to find new filename for " + options.filename)
         else:
             shutil.move(options.filename, new_path)
 
     
-    engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
-    Session = sessionmaker(bind=engine)
     queue = JoinableQueue()
     stop_event = Event()
 
@@ -272,12 +275,22 @@
         auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
         auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))
 
-     
+
+    engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
+    Session = sessionmaker(bind=engine)
+    
+    session = Session()
+    process_leftovers(session, options.token_filename)
+    session.commit()
+    session.close()
+         
     sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)    
     
     tweet_processes = []
     
     for i in range(options.consumer_nb):
+        engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
+        Session = sessionmaker(bind=engine)
         cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
         tweet_processes.append(cprocess)
 
@@ -302,7 +315,19 @@
         else:
             break
     
+    get_logger().debug("Joining Source Process")
     sprocess.join()
-    queue.join()
-    for cprocess in tweet_processes:
+    get_logger().debug("Joining Queue")
+    #queue.join()
+    for i,cprocess in enumerate(tweet_processes):
+        get_logger().debug("Joining consumer process Nb %d" % (i+1))
         cprocess.join()
+    
+    get_logger().debug("Processing leftovers")
+    session = Session()
+    process_leftovers(session, options.token_filename)
+    session.commit()
+    session.close()
+
+    get_logger().debug("Done. Exiting.")
+        
\ No newline at end of file