update listener. add support for twitter regulation messages. update virtualenv
authorYves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Tue, 07 May 2013 18:57:54 +0200
changeset 888 6fc6637d8403
parent 887 503f9a7b7d6c
child 889 c774bdf7d3dd
update listener. add support for twitter regulation messages. update virtualenv
.pydevproject
script/lib/iri_tweet/iri_tweet/models.py
script/lib/iri_tweet/iri_tweet/processor.py
script/lib/iri_tweet/iri_tweet/stream.py
script/lib/iri_tweet/iri_tweet/tests.py
script/lib/iri_tweet/iri_tweet/utils.py
script/rest/export_twitter.py
script/stream/recorder_stream.py
script/utils/export_twitter_alchemy.py
script/utils/merge_tweets.py
script/utils/search_topsy.py
script/utils/tweet_twitter_user.py
script/virtualenv/res/lib/lib_create_env.py
script/virtualenv/res/src/SQLAlchemy-0.8.0.tar.gz
script/virtualenv/res/src/SQLAlchemy-0.8.1.tar.gz
script/virtualenv/res/src/blessings-1.5.tar.gz
script/virtualenv/script/res/res_create_env.py
--- a/.pydevproject	Sun Apr 21 21:55:06 2013 +0200
+++ b/.pydevproject	Tue May 07 18:57:54 2013 +0200
@@ -3,6 +3,9 @@
 <pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">python_tl</pydev_property>
 <pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python 2.7</pydev_property>
 <pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
-<path>/tweet_live/script/lib</path>
+<path>/tweet_live/script/lib/iri_tweet</path>
+<path>/tweet_live/script/stream</path>
+<path>/tweet_live/script/rest</path>
+<path>/tweet_live/script/utils</path>
 </pydev_pathproperty>
 </pydev_project>
--- a/script/lib/iri_tweet/iri_tweet/models.py	Sun Apr 21 21:55:06 2013 +0200
+++ b/script/lib/iri_tweet/iri_tweet/models.py	Tue May 07 18:57:54 2013 +0200
@@ -1,5 +1,6 @@
 from sqlalchemy import (Boolean, Column, Enum, BigInteger, Integer, String, 
-    ForeignKey, DateTime, create_engine)
+    ForeignKey, DateTime, create_engine, event)
+from sqlalchemy.engine import Engine
 from sqlalchemy.ext.declarative import declarative_base
 from sqlalchemy.orm import relationship, sessionmaker
 import anyjson
@@ -11,12 +12,10 @@
 Base = declarative_base()
 
 APPLICATION_NAME = "IRI_TWITTER" 
-CONSUMER_KEY = "54ThDZhpEjokcMgHJOMnQA"
-CONSUMER_SECRET = "wUoL9UL2T87tfc97R0Dff2EaqRzpJ5XGdmaN2XK3udA"
+#CONSUMER_KEY = "54ThDZhpEjokcMgHJOMnQA"
+#CONSUMER_SECRET = "wUoL9UL2T87tfc97R0Dff2EaqRzpJ5XGdmaN2XK3udA"
 ACCESS_TOKEN_KEY = None
 ACCESS_TOKEN_SECRET = None
-#ACCESS_TOKEN_KEY= "47312923-LiNTtz0I18YXMVIrFeTuhmH7bOvYsK6p3Ln2Dc"
-#ACCESS_TOKEN_SECRET = "r3LoXVcjImNAElUpWqTu2SG2xCdWFHkva7xeQoncA"
 
 def adapt_date(date_str):
     ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable
@@ -82,6 +81,14 @@
         'OK' : 1,
         'ERROR' : 2,
         'NOT_TWEET': 3,
+        'DELETE': 4,
+        'SCRUB_GEO': 5,
+        'LIMIT': 6,
+        'STATUS_WITHHELD': 7,
+        'USER_WITHHELD': 8,
+        'DISCONNECT': 9,
+        'STALL_WARNING': 10,
+        'DELETE_PENDING': 4
     }
     __metaclass__ = TweetMeta
     
@@ -90,6 +97,7 @@
     ts = Column(DateTime, default=datetime.datetime.utcnow, index=True)
     tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
     tweet_source = relationship("TweetSource", backref="logs")
+    status_id = Column(BigInteger, index=True, nullable=True, default=None)
     status = Column(Integer)
     error = Column(String)
     error_stack = Column(String)
@@ -121,7 +129,7 @@
     user = relationship("User", backref="tweets")
     tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
     tweet_source = relationship("TweetSource", backref="tweet")
-    entity_list = relationship(Entity, backref='tweet')
+    entity_list = relationship(Entity, backref='tweet', cascade="all, delete-orphan")
     received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True)
         
 
@@ -269,6 +277,14 @@
     kwargs_ce = dict((k, v) for k,v in kwargs.items() if (k not in session_argname and k != "create_all"))
 
     engine = create_engine(*args, **kwargs_ce)
+    
+    if engine.name == "sqlite":
+        @event.listens_for(Engine, "connect")
+        def set_sqlite_pragma(dbapi_connection, connection_record):
+            cursor = dbapi_connection.cursor()
+            cursor.execute("PRAGMA foreign_keys=ON")
+            cursor.close()
+    
     metadata = Base.metadata        
                 
     kwargs_sm = {'bind': engine}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/iri_tweet/iri_tweet/processor.py	Tue May 07 18:57:54 2013 +0200
@@ -0,0 +1,516 @@
+# -*- coding: utf-8 -*-
+'''
+Created on Apr 29, 2013
+
+@author: ymh
+'''
+from iri_tweet.models import (User, EntityType, adapt_json, MediaType, Media, 
+    EntityMedia, Hashtag, EntityHashtag, EntityUser, EntityUrl, Url, Entity, Tweet, 
+    TweetSource, TweetLog)
+from iri_tweet.utils import (ObjectsBuffer, adapt_fields, fields_adapter, 
+    ObjectBufferProxy, get_oauth_token, clean_keys)
+from sqlalchemy.orm import joinedload
+import anyjson
+import logging
+import twitter
+import twitter_text
+
+
+class TwitterProcessorException(Exception):
+    pass
+
+class TwitterProcessor(object):
+    def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None):
+
+        if json_dict is None and json_txt is None:
+            raise TwitterProcessorException("No json")
+        
+        if json_dict is None:
+            self.json_dict = anyjson.deserialize(json_txt)
+        else:
+            self.json_dict = json_dict
+        
+        if not json_txt:
+            self.json_txt = anyjson.serialize(json_dict)
+        else:
+            self.json_txt = json_txt
+        
+        if "id" not in self.json_dict:
+            raise TwitterProcessorException("No id in json")
+        
+        self.source_id = source_id
+        self.session = session
+        self.consumer_key = consumer_token[0]
+        self.consumer_secret = consumer_token[1]
+        self.token_filename = token_filename
+        self.access_token = access_token
+        self.obj_buffer = ObjectsBuffer()
+        self.user_query_twitter = user_query_twitter
+        if not logger:
+            self.logger = logging.getLogger(__name__)
+        else:
+            self.logger = logger
+
+    def process(self):
+        if self.source_id is None:
+            tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True)
+            self.source_id = tweet_source.id
+        self.process_source()
+        self.obj_buffer.persists(self.session)
+        
+    def process_source(self):
+        raise NotImplementedError()
+    
+    def log_info(self):
+        return "Process tweet %s" %  repr(self.__class__)    
+
+
+class TwitterProcessorStatus(TwitterProcessor):
+    
+    def __get_user(self, user_dict, do_merge):
+        self.logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
+        
+        user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
+    
+        user_id = user_dict.get("id",None)    
+        user_name = user_dict.get("screen_name", user_dict.get("name", None))
+        
+        if user_id is None and user_name is None:
+            return None
+
+        user = None
+        if user_id:
+            user = self.obj_buffer.get(User, id=user_id)
+        else:
+            user = self.obj_buffer.get(User, screen_name=user_name)
+
+        #to do update user id needed            
+        if user is not None:
+            user_created_at = None
+            if user.args is not None:
+                user_created_at = user.args.get('created_at', None)
+            if user_created_at is None and user_dict.get('created_at', None) is not None and do_merge:
+                if user.args is None:
+                    user.args = user_dict
+                else:
+                    user.args.update(user_dict)
+            return user
+
+        #todo : add methpds to objectbuffer to get buffer user
+        user_obj = None
+        if user_id:
+            user_obj = self.session.query(User).filter(User.id == user_id).first()
+        else:
+            user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first()
+    
+        #todo update user if needed
+        if user_obj is not None:            
+            if user_obj.created_at is not None or user_dict.get('created_at', None) is None or not do_merge :
+                user = ObjectBufferProxy(User, None, None, False, user_obj)
+            else:
+                user = self.obj_buffer.add_object(User, None, user_dict, True, user_obj)
+            return user
+    
+        user_created_at = user_dict.get("created_at", None)
+        
+        if user_created_at is None and self.user_query_twitter:
+            
+            if self.access_token is not None:
+                acess_token_key, access_token_secret = self.access_token
+            else:
+                acess_token_key, access_token_secret = get_oauth_token(consumer_key=self.consumer_key, consumer_secret=self.consumer_secret, token_file_path=self.token_filename)
+            #TODO pass it as argument    
+            t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, self.consumer_key, self.consumer_secret))
+            try:
+                if user_id:
+                    user_dict = t.users.show(user_id=user_id)
+                else:
+                    user_dict = t.users.show(screen_name=user_name)
+            except Exception as e:
+                self.logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
+                self.logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
+                return None
+            
+        if "id" not in user_dict:
+            return None
+        
+        #TODO filter get, wrap in proxy
+        user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first()
+        
+        if user_obj is not None and not do_merge:
+            return ObjectBufferProxy(User, None, None, False, user_obj)
+        else:        
+            return self.obj_buffer.add_object(User, None, user_dict, True)        
+
+    def __get_or_create_object(self, klass, filter_by_kwargs, filter_arg, creation_kwargs, must_flush, do_merge):
+        
+        obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs)
+        if obj_proxy is None:
+            query = self.session.query(klass)
+            if filter_arg is not None:
+                query = query.filter(filter_arg)
+            else:
+                query = query.filter_by(**filter_by_kwargs)
+            obj_instance = query.first()
+            if obj_instance is not None:
+                if not do_merge:
+                    obj_proxy = ObjectBufferProxy(klass, None, None, False, obj_instance)
+                else:
+                    obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush, obj_instance)
+        if obj_proxy is None:
+            obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush)
+        return obj_proxy
+
+
+    def __process_entity(self, ind, ind_type):
+        self.logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
+        
+        ind = clean_keys(ind)
+        
+        entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False)
+        
+        entity_dict = {
+           "indice_start"   : ind["indices"][0],
+           "indice_end"     : ind["indices"][1],
+           "tweet_id"       : self.tweet.id,
+           "entity_type_id" : entity_type.id,
+           "source"         : adapt_json(ind)
+        }
+
+        def process_medias():
+            
+            media_id = ind.get('id', None)
+            if media_id is None:
+                return None, None
+            
+            type_str = ind.get("type", "photo")
+            media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False)
+            media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"])
+            if "type" in media_ind:
+                del(media_ind["type"])
+            media_ind['type_id'] = media_type.id            
+            media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False)
+            
+            entity_dict['media_id'] = media.id
+            return EntityMedia, entity_dict
+
+        def process_hashtags():
+            text = ind.get("text", ind.get("hashtag", None))
+            if text is None:
+                return None, None
+            ind['text'] = text
+            hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False)
+            entity_dict['hashtag_id'] = hashtag.id
+            return EntityHashtag, entity_dict             
+        
+        def process_user_mentions():
+            user_mention = self.__get_user(ind, False)
+            if user_mention is None:
+                entity_dict['user_id'] = None
+            else:
+                entity_dict['user_id'] = user_mention.id
+            return EntityUser, entity_dict
+        
+        def process_urls():
+            url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False)
+            entity_dict['url_id'] = url.id
+            return EntityUrl, entity_dict
+                
+        #{'': lambda }
+        entity_klass, entity_dict =  { 
+            'hashtags': process_hashtags,
+            'user_mentions' : process_user_mentions,
+            'urls' : process_urls,
+            'media': process_medias,
+            }.get(ind_type, lambda: (Entity, entity_dict))()
+            
+        self.logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
+        if entity_klass:
+            self.obj_buffer.add_object(entity_klass, None, entity_dict, False)
+
+
+    def __process_twitter_stream(self):
+        
+        tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
+        if tweet_nb > 0:
+            return
+        
+        ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
+        
+        # get or create user
+        user = self.__get_user(self.json_dict["user"], True)
+        if user is None:
+            self.logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
+            ts_copy["user_id"] = None
+        else:
+            ts_copy["user_id"] = user.id
+            
+        del(ts_copy['user'])
+        ts_copy["tweet_source_id"] = self.source_id
+        
+        self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True)
+            
+        self.__process_entities()
+
+
+    def __process_entities(self):
+        if "entities" in self.json_dict:
+            for ind_type, entity_list in self.json_dict["entities"].items():
+                for ind in entity_list:
+                    self.__process_entity(ind, ind_type)
+        else:
+            
+            text = self.tweet.text
+            extractor = twitter_text.Extractor(text)
+            for ind in extractor.extract_hashtags_with_indices():
+                self.__process_entity(ind, "hashtags")
+            
+            for ind in extractor.extract_urls_with_indices():
+                self.__process_entity(ind, "urls")
+            
+            for ind in extractor.extract_mentioned_screen_names_with_indices():
+                self.__process_entity(ind, "user_mentions")
+
+    def __process_twitter_rest(self):
+        tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
+        if tweet_nb > 0:
+            return
+        
+        
+        tweet_fields = {
+            'created_at': self.json_dict["created_at"], 
+            'favorited': False,
+            'id': self.json_dict["id"],
+            'id_str': self.json_dict["id_str"],
+            #'in_reply_to_screen_name': ts["to_user"], 
+            'in_reply_to_user_id': self.json_dict.get("in_reply_to_user_id",None),
+            'in_reply_to_user_id_str': self.json_dict.get("in_reply_to_user_id_str", None),
+            #'place': ts["place"],
+            'source': self.json_dict["source"],
+            'text': self.json_dict["text"],
+            'truncated': False,            
+            'tweet_source_id' : self.source_id,
+        }
+        
+        #user
+    
+        user_fields = {
+            'lang' : self.json_dict.get('iso_language_code',None),
+            'profile_image_url' : self.json_dict["profile_image_url"],
+            'screen_name' : self.json_dict["from_user"],
+            'id' : self.json_dict["from_user_id"],
+            'id_str' : self.json_dict["from_user_id_str"],
+            'name' : self.json_dict['from_user_name'],
+        }
+        
+        user = self.__get_user(user_fields, do_merge=False)
+        if user is None:
+            self.logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
+            tweet_fields["user_id"] = None
+        else:
+            tweet_fields["user_id"] = user.id
+        
+        tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
+        self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True)
+                
+        self.__process_entities()
+
+
+
+    def process_source(self):
+                
+        status_id = self.json_dict["id"]
+        log = self.session.query(TweetLog).filter(TweetLog.status_id==status_id).first()
+        if(log):
+            self.obj_buffer.add_object(TweetLog, log, {'status': TweetLog.TWEET_STATUS['DELETE'], 'status_id': None})
+            self.session.query(TweetSource).filter(TweetSource.id==self.source_id).delete()
+        else:
+            if "metadata" in self.json_dict:
+                self.__process_twitter_rest()
+            else:
+                self.__process_twitter_stream()
+
+        self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, True)
+
+    def log_info(self):
+        screen_name = self.json_dict.get("user",{}).get("screen_name","")
+        return u"Process Tweet from %s : %s" % (screen_name, self.json_dict.get('text',u""))
+
+
+
+class TwitterProcessorDelete(TwitterProcessor):
+    """
+    {
+      "delete":{
+        "status":{
+            "id":1234,
+            "id_str":"1234",
+            "user_id":3,
+            "user_id_str":"3"
+        }
+      }
+    }
+    """
+
+    def process(self):
+                   
+        #find tweet
+        tweet_id = self.json_dict.get('delete',{}).get('status',{}).get('id',None)
+        if tweet_id:
+            t = self.session.query(Tweet).options(joinedload(Tweet.tweet_source)).filter(Tweet.id == tweet_id).first()
+            if t:
+                tsource = t.tweet_source                
+                self.session.delete(t)
+                self.session.query(TweetLog).filter(TweetLog.tweet_source_id == tsource.id).delete()
+                self.session.delete(tsource)
+                self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['DELETE']}, True)
+            else:
+                self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status_id': tweet_id,'status':TweetLog.TWEET_STATUS['DELETE_PENDING']}, True)
+                
+    def log_info(self):
+        status_del = self.json_dict.get('delete', {}).get("status",{})
+        return u"Process delete for %s : %s" % (status_del.get('user_id_str',u""), status_del.get('id_str',u""))
+
+class TwitterProcessorScrubGeo(TwitterProcessor):
+    """
+    {
+        "scrub_geo":{
+        "user_id":14090452,
+        "user_id_str":"14090452",
+        "up_to_status_id":23260136625,
+        "up_to_status_id_str":"23260136625"
+      }
+    }
+    """
+    
+    def process_source(self):        
+        up_to_status_id = self.json_dict.get("scrub_geo", {}).get("up_to_status_id", None)
+        if not up_to_status_id:
+            return
+        tweets = self.session.query(Tweet).options(joinedload(Tweet.tweet_source)).filter(Tweet.id <= up_to_status_id)
+        for t in tweets:
+            self.obj_buffer.add_object(Tweet, t, {'geo': None})
+            tsource = t.tweet_source
+            tsource_dict = anyjson.serialize(tsource.original_json)
+            if tsource_dict.get("geo", None):
+                tsource_dict["geo"] = None
+                self.obj_buffer.add_object(TweetSource, tsource, {'original_json': anyjson.serialize(tsource_dict)})
+        self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['SCRUB_GEO']}, True)
+    
+    def log_info(self):
+        return u"Process scrub geo for %s : %s" % (self.json_dict["scrub_geo"].get('user_id_str',u""), self.json_dict["scrub_geo"].get('id_str',u""))
+
+
+class TwitterProcessorLimit(TwitterProcessor):
+    """
+    {
+      "limit":{
+        "track":1234
+      }
+    }
+    """
+    def process_source(self):
+        """
+        do nothing, just log the information
+        """    
+        self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['LIMIT'], 'error':self.json_txt}, True)
+        
+    def log_info(self):
+        return u"Process limit %d " % self.json_dict.get("limit", {}).get('track', 0)
+        
+class TwitterProcessorStatusWithheld(TwitterProcessor):
+    """
+    {
+      "status_withheld":{
+      "id":1234567890,
+      "user_id":123456,
+      "withheld_in_countries":["DE", "AR"]
+      }
+    }
+    """
+    def process_source(self):
+        """
+        do nothing, just log the information
+        """
+        self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['STATUS_WITHHELD'], 'error':self.json_txt}, True)
+        
+    def log_info(self):
+        status_withheld = self.json_dict.get("status_withheld",{})
+        return u"Process status withheld status id %d from user %d in countries %s" %(status_withheld.get("id",0), status_withheld.get("user_id",0), u",".join(status_withheld.get("withheld_in_countries",[])))
+
+class TwitterProcessorUserWithheld(TwitterProcessor):
+    """
+    {  
+      "user_withheld":{
+        "id":123456,
+        "withheld_in_countries":["DE","AR"]
+      }
+    }
+    """
+    def process_source(self):
+        """
+        do nothing, just log the information
+        """
+        self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['USER_WITHHELD'], 'error':self.json_txt}, True)
+
+
+    def log_info(self):
+        user_withheld = self.json_dict.get("user_withheld", {})
+        return u"Process user withheld %d in countries %s" % (user_withheld.get("id",0), u"".join(user_withheld.get("withheld_in_countries",[])))
+
+class TwitterProcessorDisconnect(TwitterProcessor):
+    """
+    {
+      "disconnect":{
+        "code": 4,
+        "stream_name":"< A stream identifier >",
+        "reason":"< Human readable status message >"
+      }
+    }
+    """
+    def process_source(self):
+        """
+        do nothing, just log the information
+        """
+        self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['DISCONNECT'], 'error':self.json_txt}, True)
+
+    def log_info(self):
+        disconnect = self.json_dict.get("disconnect",{})
+        return u"Process disconnect stream %s code %d reason %s" % (disconnect.get("stream_name",""), disconnect.get("code",0), disconnect.get("reason",""))
+
+class TwitterProcessorStallWarning(TwitterProcessor):
+    """
+    {
+      "warning":{
+        "code":"FALLING_BEHIND",
+        "message":"Your connection is falling behind and messages are being queued for delivery to you. Your queue is now over 60% full. You will be disconnected when the queue is full.",
+        "percent_full": 60
+      }
+    }
+    """
+    def process_source(self):
+        """
+        do nothing, just log the information
+        """
+        self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['STALL_WARNING'], 'error':self.json_txt}, True)
+
+    def log_info(self):
+        warning = self.json_dict.get("warning",{})
+        return u"Process stall warning %d%% code %s, message %s" % (warning.get("percent_full",0),warning.get("code",u""), warning.get("message", u""))
+
+TWEET_PROCESSOR_MAP = {
+    'text': TwitterProcessorStatus,
+    'delete': TwitterProcessorDelete,
+    'scrub_geo': TwitterProcessorScrubGeo,
+    'limit': TwitterProcessorLimit,
+    'status_withheld': TwitterProcessorStatusWithheld,
+    'user_withheld': TwitterProcessorUserWithheld,
+    'disconnect': TwitterProcessorDisconnect,
+    'warning': TwitterProcessorStallWarning 
+}
+
+def get_processor(tweet_dict):
+    for processor_key,processor_klass in TWEET_PROCESSOR_MAP.iteritems():
+        if processor_key in tweet_dict:
+            return processor_klass
+    return None
--- a/script/lib/iri_tweet/iri_tweet/stream.py	Sun Apr 21 21:55:06 2013 +0200
+++ b/script/lib/iri_tweet/iri_tweet/stream.py	Tue May 07 18:57:54 2013 +0200
@@ -121,12 +121,11 @@
     """
 
     def __init__(self, auth,
-                 catchup=None, raw=False, timeout=-1, url=None, compressed=False, chunk_size=4096, logger=None):
+                  raw=False, timeout=-1, url=None, compressed=False, chunk_size=4096, logger=None):
         self._conn = None
         self._rate_ts = None
         self._rate_cnt = 0
         self._auth = auth
-        self._catchup_count = catchup
         self.raw_mode = raw
         self.timeout = timeout
         self._compressed = compressed
@@ -165,8 +164,6 @@
 
         postdata = self._get_post_data() or {}
         postdata['stall_warnings'] = 'true'
-        if self._catchup_count:
-            postdata["count"] = self._catchup_count
         
         if self._logger : self._logger.debug("BaseStream init connection url " + repr(self.url))
         if self._logger : self._logger.debug("BaseStream init connection headers " + repr(headers))
@@ -204,37 +201,58 @@
             self._rate_ts = time.time()
             
     def _iter_object(self):
+   
+#        for line in self._resp.iter_lines():
+#            yield line     
+#         pending = None
+# 
+#         for chunk in self._resp.iter_content(chunk_size=self.chunk_size, decode_unicode=None):
+# 
+#             if pending is not None:
+#                 chunk = pending + chunk 
+#             lines = chunk.splitlines()
+# 
+#             if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
+#                 pending = lines.pop()
+#             else:
+#                 pending = None
+# 
+#             for line in lines:
+#                 yield line
+# 
+#         if pending is not None:
+#             yield pending
+            
         pending = None
         has_stopped = False
-
-#        for chunk in iter_content_non_blocking(self._resp,
-#            max_chunk_size=self.chunk_size,
-#            decode_unicode=False,
-#            timeout=self.timeout):
+         
+        if self._logger : self._logger.debug("BaseStream _iter_object")
+ 
         for chunk in self._resp.iter_content(
             chunk_size=self.chunk_size,
-            decode_unicode=False):
-
+            decode_unicode=None):
+ 
+            if self._logger : self._logger.debug("BaseStream _iter_object loop")
             if self.testmuststop():
                 has_stopped = True
                 break
-
+ 
             if pending is not None:
                 chunk = pending + chunk
             lines = chunk.split('\r')
-
-            if chunk and lines[-1] and lines[-1][-1] == chunk[-1]:
+ 
+            if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
                 pending = lines.pop()
             else:
                 pending = None
-
+ 
             for line in lines:
                 yield line.strip('\n')
-
+ 
             if self.testmuststop():
                 has_stopped = True
                 break
-
+ 
         if pending is not None:
             yield pending
         if has_stopped:
@@ -248,11 +266,14 @@
             self._init_conn()
 
         if self._logger : self._logger.debug("BaseStream __iter__ connected")
+        
         for line in self._iter_object():
 
+            if self._logger : self._logger.debug("BaseStream __iter__ line %s " % repr(line))
+            
             if not line:
                 continue
-                            
+
             if (self.raw_mode):
                 tweet = line
             else:
@@ -285,12 +306,12 @@
     url = "https://stream.twitter.com/1.1/statuses/filter.json"
 
     def __init__(self, auth, follow=None, locations=None,
-                 track=None, catchup=None, url=None, raw=False, timeout=None, compressed=False, chunk_size=4096, logger=None):
+                 track=None, url=None, raw=False, timeout=None, compressed=False, chunk_size=requests.models.ITER_CHUNK_SIZE, logger=None):
         self._follow = follow
         self._locations = locations
         self._track = track
         # remove follow, locations, track
-        BaseStream.__init__(self, auth, url=url, raw=raw, catchup=catchup, timeout=timeout, compressed=compressed, chunk_size=chunk_size, logger=logger)
+        BaseStream.__init__(self, auth, url=url, raw=raw, timeout=timeout, compressed=compressed, chunk_size=chunk_size, logger=logger)
 
     def _get_post_data(self):
         postdata = {}
@@ -358,16 +379,14 @@
                     if self._logger: self._logger.debug("tweet : " + repr(tweet))
                     self._reconnects = 0
                     self._retry_wait = 0
-                    if "warning" in tweet:
-                        if self._logger: self._logger.warning("Tweet warning received : %s" % repr(tweet))
-                        continue
                     if not tweet.strip():
                         if self._logger: self._logger.debug("Empty Tweet received : PING")
                         continue
                     yield tweet
             except requests.exceptions.HTTPError as e:
                 if e.response.status_code == 401:
-                    raise AuthenticationError("Error connecting to %s : %s" % (e.response.url,e.message))
+                    if self._logger: self._logger.debug("SafeStreamWrapper Connection Error http error on %s : %s" % (e.response.url,e.message))
+                    raise AuthenticationError("Error connecting to %s : %s : %s - %s" % (e.response.url,e.message, repr(e.response.headers),repr(e.response.text)))
                 if e.response.status_code > 200:
                     self.__process_http_error(e)
                 else:
--- a/script/lib/iri_tweet/iri_tweet/tests.py	Sun Apr 21 21:55:06 2013 +0200
+++ b/script/lib/iri_tweet/iri_tweet/tests.py	Tue May 07 18:57:54 2013 +0200
@@ -3,10 +3,14 @@
 from sqlalchemy.orm import relationship, backref
 import unittest #@UnresolvedImport
 from sqlalchemy.orm import sessionmaker
-from iri_tweet.utils import ObjectsBuffer, TwitterProcessor
+from iri_tweet.utils import ObjectsBuffer
+from iri_tweet.processor import TwitterProcessorStatus
 from iri_tweet import models
 import tempfile #@UnresolvedImport
 import os
+import logging
+
+logger = logging.getLogger(__name__);
 
 Base = declarative_base()
 
@@ -129,12 +133,12 @@
     def setUp(self):
         self.engine, self.metadata, sessionMaker = models.setup_database('sqlite:///:memory:', echo=True)
         self.session = sessionMaker()
-        file, self.tmpfilepath = tempfile.mkstemp()
-        os.close(file)
+        tmpfile, self.tmpfilepath = tempfile.mkstemp()
+        os.close(tmpfile)
 
 
     def testTwitterProcessor(self):
-        tp = TwitterProcessor(None, original_json, None, self.session, self.tmpfilepath)
+        tp = TwitterProcessorStatus(None, original_json, None, self.session, self.tmpfilepath, logger)
         tp.process()
         self.session.commit()
         
@@ -154,7 +158,7 @@
         
 
     def testTwitterProcessorMedia(self):
-        tp = TwitterProcessor(None, original_json_media, None, self.session, self.tmpfilepath)
+        tp = TwitterProcessorStatus(None, original_json_media, None, self.session, self.tmpfilepath, logger)
         tp.process()
         self.session.commit()
         
@@ -174,7 +178,7 @@
 
 
     def testTwitterProcessorMediaOthers(self):
-        tp = TwitterProcessor(None, original_json_media_others, None, self.session, self.tmpfilepath)
+        tp = TwitterProcessorStatus(None, original_json_media_others, None, self.session, self.tmpfilepath, logger)
         tp.process()
         self.session.commit()
         
--- a/script/lib/iri_tweet/iri_tweet/utils.py	Sun Apr 21 21:55:06 2013 +0200
+++ b/script/lib/iri_tweet/iri_tweet/utils.py	Tue May 07 18:57:54 2013 +0200
@@ -1,25 +1,22 @@
-from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, #@UnresolvedImport
-    EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY,#@UnresolvedImport
-    ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType, #@UnresolvedImport 
-    Media, EntityMedia, Entity, EntityType) #@UnresolvedImport
-from sqlalchemy.sql import select, or_ #@UnresolvedImport
-import Queue #@UnresolvedImport
-import anyjson #@UnresolvedImport
+from models import (Tweet, User, Hashtag, EntityHashtag, APPLICATION_NAME, ACCESS_TOKEN_SECRET, adapt_date, adapt_json, 
+    ACCESS_TOKEN_KEY)
+from sqlalchemy.sql import select, or_
+import Queue
 import codecs
 import datetime
 import email.utils
 import logging
 import math
 import os.path
+import socket
 import sys
-import twitter.oauth #@UnresolvedImport
-import twitter.oauth_dance #@UnresolvedImport
-import twitter_text #@UnresolvedImport
+import twitter.oauth
+import twitter.oauth_dance
 
 
 CACHE_ACCESS_TOKEN = {}
 
-def get_oauth_token(token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET):
+def get_oauth_token(consumer_key, consumer_secret, token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME):
     
     global CACHE_ACCESS_TOKEN
 
@@ -34,25 +31,27 @@
     
     if res is not None and check_access_token:
         get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable
-        t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], CONSUMER_KEY, CONSUMER_SECRET))
+        t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], consumer_key, consumer_secret))
         status = None
         try:
-            status = t.account.rate_limit_status()
+            status = t.application.rate_limit_status(resources="account")
         except Exception as e:
             get_logger().debug("get_oauth_token : error getting rate limit status %s " % repr(e))            
             get_logger().debug("get_oauth_token : error getting rate limit status %s " % str(e))
             status = None
         get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable
-        if status is None or status['remaining_hits'] == 0:
+        if status is None or status.get("resources",{}).get("account",{}).get('/account/verify_credentials',{}).get('remaining',0) == 0:
             get_logger().debug("get_oauth_token : Problem with status %s" % repr(status))
             res = None
 
     if res is None:
         get_logger().debug("get_oauth_token : doing the oauth dance")
         res = twitter.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
+        
     
     CACHE_ACCESS_TOKEN[application_name] = res
     
+    get_logger().debug("get_oauth_token : done got %s" % repr(res))
     return res
 
 def parse_date(date_str):
@@ -169,301 +168,6 @@
                     return proxy        
         return None
                 
-class TwitterProcessorException(Exception):
-    pass
-
-class TwitterProcessor(object):
-    
-    def __init__(self, json_dict, json_txt, source_id, session, access_token=None, token_filename=None, user_query_twitter=False):
-
-        if json_dict is None and json_txt is None:
-            raise TwitterProcessorException("No json")
-        
-        if json_dict is None:
-            self.json_dict = anyjson.deserialize(json_txt)
-        else:
-            self.json_dict = json_dict
-        
-        if not json_txt:
-            self.json_txt = anyjson.serialize(json_dict)
-        else:
-            self.json_txt = json_txt
-        
-        if "id" not in self.json_dict:
-            raise TwitterProcessorException("No id in json")
-        
-        self.source_id = source_id
-        self.session = session
-        self.token_filename = token_filename
-        self.access_token = access_token
-        self.obj_buffer = ObjectsBuffer()
-        self.user_query_twitter = user_query_twitter  
-        
-
-
-    def __get_user(self, user_dict, do_merge):
-        get_logger().debug("Get user : " + repr(user_dict)) #@UndefinedVariable
-        
-        user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
-    
-        user_id = user_dict.get("id",None)    
-        user_name = user_dict.get("screen_name", user_dict.get("name", None))
-        
-        if user_id is None and user_name is None:
-            return None
-
-        user = None
-        if user_id:
-            user = self.obj_buffer.get(User, id=user_id)
-        else:
-            user = self.obj_buffer.get(User, screen_name=user_name)
-
-        #to do update user id needed            
-        if user is not None:
-            user_created_at = None
-            if user.args is not None:
-                user_created_at = user.args.get('created_at', None)
-            if user_created_at is None and user_dict.get('created_at', None) is not None and do_merge:
-                if user.args is None:
-                    user.args = user_dict
-                else:
-                    user.args.update(user_dict)
-            return user
-
-        #todo : add methpds to objectbuffer to get buffer user
-        user_obj = None
-        if user_id:
-            user_obj = self.session.query(User).filter(User.id == user_id).first()
-        else:
-            user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first()
-    
-        #todo update user if needed
-        if user_obj is not None:            
-            if user_obj.created_at is not None or user_dict.get('created_at', None) is None or not do_merge :
-                user = ObjectBufferProxy(User, None, None, False, user_obj)
-            else:
-                user = self.obj_buffer.add_object(User, None, user_dict, True, user_obj)
-            return user
-    
-        user_created_at = user_dict.get("created_at", None)
-        
-        if user_created_at is None and self.user_query_twitter:
-            
-            if self.access_token is not None:
-                acess_token_key, access_token_secret = self.access_token
-            else:
-                acess_token_key, access_token_secret = get_oauth_token(self.token_filename)
-            t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET))
-            try:
-                if user_id:
-                    user_dict = t.users.show(user_id=user_id)
-                else:
-                    user_dict = t.users.show(screen_name=user_name)            
-            except Exception as e:
-                get_logger().info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
-                get_logger().info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
-                return None
-            
-        if "id" not in user_dict:
-            return None
-        
-        #TODO filter get, wrap in proxy
-        user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first()
-        
-        if user_obj is not None and not do_merge:
-            return ObjectBufferProxy(User, None, None, False, user_obj)
-        else:        
-            return self.obj_buffer.add_object(User, None, user_dict, True)        
-
-    def __get_or_create_object(self, klass, filter_by_kwargs, filter_arg, creation_kwargs, must_flush, do_merge):
-        
-        obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs)
-        if obj_proxy is None:
-            query = self.session.query(klass)
-            if filter_arg is not None:
-                query = query.filter(filter_arg)
-            else:
-                query = query.filter_by(**filter_by_kwargs)
-            obj_instance = query.first()
-            if obj_instance is not None:
-                if not do_merge:
-                    obj_proxy = ObjectBufferProxy(klass, None, None, False, obj_instance)
-                else:
-                    obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush, obj_instance)
-        if obj_proxy is None:
-            obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush)
-        return obj_proxy
-
-
-    def __process_entity(self, ind, ind_type):
-        get_logger().debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
-        
-        ind = clean_keys(ind)
-        
-        entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False)
-        
-        entity_dict = {
-           "indice_start"   : ind["indices"][0],
-           "indice_end"     : ind["indices"][1],
-           "tweet_id"       : self.tweet.id,
-           "entity_type_id" : entity_type.id,
-           "source"         : adapt_json(ind)
-        }
-
-        def process_medias():
-            
-            media_id = ind.get('id', None)
-            if media_id is None:
-                return None, None
-            
-            type_str = ind.get("type", "photo")
-            media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False)
-            media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"])
-            if "type" in media_ind:
-                del(media_ind["type"])
-            media_ind['type_id'] = media_type.id            
-            media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False)
-            
-            entity_dict['media_id'] = media.id
-            return EntityMedia, entity_dict
-
-        def process_hashtags():
-            text = ind.get("text", ind.get("hashtag", None))
-            if text is None:
-                return None, None
-            ind['text'] = text
-            hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False)
-            entity_dict['hashtag_id'] = hashtag.id
-            return EntityHashtag, entity_dict             
-        
-        def process_user_mentions():
-            user_mention = self.__get_user(ind, False)
-            if user_mention is None:
-                entity_dict['user_id'] = None
-            else:
-                entity_dict['user_id'] = user_mention.id
-            return EntityUser, entity_dict
-        
-        def process_urls():
-            url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False)
-            entity_dict['url_id'] = url.id
-            return EntityUrl, entity_dict
-                
-        #{'': lambda }
-        entity_klass, entity_dict =  { 
-            'hashtags': process_hashtags,
-            'user_mentions' : process_user_mentions,
-            'urls' : process_urls,
-            'media': process_medias,
-            }.get(ind_type, lambda: (Entity, entity_dict))()
-            
-        get_logger().debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
-        if entity_klass:
-            self.obj_buffer.add_object(entity_klass, None, entity_dict, False)
-
-
-    def __process_twitter_stream(self):
-        
-        tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
-        if tweet_nb > 0:
-            return
-        
-        ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
-        
-        # get or create user
-        user = self.__get_user(self.json_dict["user"], True)
-        if user is None:
-            get_logger().warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
-            ts_copy["user_id"] = None
-        else:
-            ts_copy["user_id"] = user.id
-            
-        del(ts_copy['user'])
-        ts_copy["tweet_source_id"] = self.source_id
-        
-        self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True)
-            
-        self.__process_entities()
-
-
-    def __process_entities(self):
-        if "entities" in self.json_dict:
-            for ind_type, entity_list in self.json_dict["entities"].items():
-                for ind in entity_list:
-                    self.__process_entity(ind, ind_type)
-        else:
-            
-            text = self.tweet.text
-            extractor = twitter_text.Extractor(text)
-            for ind in extractor.extract_hashtags_with_indices():
-                self.__process_entity(ind, "hashtags")
-            
-            for ind in extractor.extract_urls_with_indices():
-                self.__process_entity(ind, "urls")
-            
-            for ind in extractor.extract_mentioned_screen_names_with_indices():
-                self.__process_entity(ind, "user_mentions")
-
-    def __process_twitter_rest(self):
-        tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
-        if tweet_nb > 0:
-            return
-        
-        
-        tweet_fields = {
-            'created_at': self.json_dict["created_at"], 
-            'favorited': False,
-            'id': self.json_dict["id"],
-            'id_str': self.json_dict["id_str"],
-            #'in_reply_to_screen_name': ts["to_user"], 
-            'in_reply_to_user_id': self.json_dict.get("in_reply_to_user_id",None),
-            'in_reply_to_user_id_str': self.json_dict.get("in_reply_to_user_id_str", None),
-            #'place': ts["place"],
-            'source': self.json_dict["source"],
-            'text': self.json_dict["text"],
-            'truncated': False,            
-            'tweet_source_id' : self.source_id,
-        }
-        
-        #user
-    
-        user_fields = {
-            'lang' : self.json_dict.get('iso_language_code',None),
-            'profile_image_url' : self.json_dict["profile_image_url"],
-            'screen_name' : self.json_dict["from_user"],
-            'id' : self.json_dict["from_user_id"],
-            'id_str' : self.json_dict["from_user_id_str"],
-            'name' : self.json_dict['from_user_name'],
-        }
-        
-        user = self.__get_user(user_fields, do_merge=False)
-        if user is None:
-            get_logger().warning("USER not found " + repr(user_fields)) #@UndefinedVariable
-            tweet_fields["user_id"] = None
-        else:
-            tweet_fields["user_id"] = user.id
-        
-        tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
-        self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True)
-                
-        self.__process_entities()
-
-
-
-    def process(self):
-        
-        if self.source_id is None:
-            tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True)
-            self.source_id = tweet_source.id
-        
-        if "metadata" in self.json_dict:
-            self.__process_twitter_rest()
-        else:
-            self.__process_twitter_stream()
-
-        self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, True)
-        
-        self.obj_buffer.persists(self.session)
 
 
 def set_logging(options, plogger=None, queue=None):
@@ -508,12 +212,12 @@
     return logger
 
 def set_logging_options(parser):
-    parser.add_option("-l", "--log", dest="logfile",
+    parser.add_argument("-l", "--log", dest="logfile",
                       help="log to file", metavar="LOG", default="stderr")
-    parser.add_option("-v", dest="verbose", action="count",
-                      help="verbose", metavar="VERBOSE", default=0)
-    parser.add_option("-q", dest="quiet", action="count",
-                      help="quiet", metavar="QUIET", default=0)
+    parser.add_argument("-v", dest="verbose", action="count",
+                      help="verbose", default=0)
+    parser.add_argument("-q", dest="quiet", action="count",
+                      help="quiet", default=0)
 
 def get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
     
@@ -591,8 +295,10 @@
             if ei:
                 dummy = self.format(record) # just to get traceback text into record.exc_text
                 record.exc_info = None  # not needed any more
-            if not self.ignore_full or not self.queue.full():
+            if not self.ignore_full or (not self.queue.full()):
                 self.queue.put_nowait(record)
+        except AssertionError:
+            pass
         except Queue.Full:
             if self.ignore_full:
                 pass
@@ -626,3 +332,10 @@
     
     return writer
 
+def get_unused_port():
+    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    s.bind(('localhost', 0))
+    _, port = s.getsockname()
+    s.close()
+    return port
+
--- a/script/rest/export_twitter.py	Sun Apr 21 21:55:06 2013 +0200
+++ b/script/rest/export_twitter.py	Tue May 07 18:57:54 2013 +0200
@@ -1,16 +1,15 @@
 #!/usr/bin/env python
 # coding=utf-8
 
-from sqlite3 import *
+from sqlite3 import register_adapter, register_converter, connect, Row
 import datetime, time
 import email.utils
 from optparse import OptionParser
 import os.path
-import os
-import sys
 from lxml import etree
 import uuid
 import re
+import simplejson
 
 def parse_date(date_str):
     ts = email.utils.parsedate_tz(date_str)
@@ -20,10 +19,10 @@
     return time.mktime(ts.timetuple())
     
 def adapt_geo(geo):
-	return simplejson.dumps(geo)
-	
+    return simplejson.dumps(geo)
+
 def convert_geo(s):
-	return simplejson.loads(s)
+    return simplejson.loads(s)
 
 
 register_adapter(datetime.datetime, adapt_datetime)
@@ -73,7 +72,7 @@
     ts = int(parse_date(options.start_date))
 
     if options.end_date:
-    	te = int(parse_date(options.end_date))
+        te = int(parse_date(options.end_date))
     else:
         te = ts + options.duration
     
--- a/script/stream/recorder_stream.py	Sun Apr 21 21:55:06 2013 +0200
+++ b/script/stream/recorder_stream.py	Tue May 07 18:57:54 2013 +0200
@@ -1,14 +1,14 @@
-from getpass import getpass
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
 from iri_tweet import models, utils
 from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
-from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
-    get_logger)
-from optparse import OptionParser
+from iri_tweet.processor import get_processor
+from multiprocessing import Queue as mQueue, Process, Event
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm import scoped_session
 import Queue
 import StringIO
 import anyjson
+import argparse
 import datetime
 import inspect
 import iri_tweet.stream
@@ -21,6 +21,7 @@
 import socket
 import sqlalchemy.schema
 import sys
+import thread
 import threading
 import time
 import traceback
@@ -35,7 +36,20 @@
 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
 # just put it in a sqlite3 tqble
 
-DEFAULT_TIMEOUT = 5
+DEFAULT_TIMEOUT = 3
+
+class Requesthandler(BaseHTTPRequestHandler):
+
+    def __init__(self, request, client_address, server):
+        BaseHTTPRequestHandler.__init__(self, request, client_address, server)
+        
+    def do_GET(self):
+        self.send_response(200)
+        self.end_headers()
+    
+    def log_message(self, format, *args):        # @ReservedAssignment
+        pass
+
 
 def set_logging(options):
     loggers = []
@@ -55,19 +69,16 @@
     return qlogger
 
 def get_auth(options, access_token):
-    if options.username and options.password:
-        auth = requests.auth.BasicAuthHandler(options.username, options.password)        
-    else:
-        consumer_key = models.CONSUMER_KEY
-        consumer_secret = models.CONSUMER_SECRET
-        auth = requests_oauthlib.OAuth1(access_token[0], access_token[1], consumer_key, consumer_secret, signature_type='auth_header')
+    consumer_key = options.consumer_key
+    consumer_secret = options.consumer_secret
+    auth = requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=access_token[0], resource_owner_secret=access_token[1], signature_type='auth_header')
     return auth
 
 
-def add_process_event(type, args, session_maker):
+def add_process_event(event_type, args, session_maker):
     session = session_maker()
     try:
-        evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
+        evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=event_type)
         session.add(evt)
         session.commit()
     finally:
@@ -83,6 +94,7 @@
         self.options = options
         self.logger_queue = logger_queue
         self.stop_event = stop_event
+        self.consumer_token = (options.consumer_key, options.consumer_secret)
         self.access_token = access_token
 
         super(BaseProcess, self).__init__()
@@ -122,16 +134,15 @@
     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
         self.track = options.track
         self.token_filename = options.token_filename
-        self.catchup = options.catchup
         self.timeout = options.timeout
         self.stream = None
         super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
                     
     def __source_stream_iter(self):
-        
-        self.logger = set_logging_process(self.options, self.logger_queue)
+                
         self.logger.debug("SourceProcess : run ")
         
+        self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.access_token))
         self.auth = get_auth(self.options, self.access_token) 
         self.logger.debug("SourceProcess : auth set ")
         
@@ -140,8 +151,8 @@
         
         track_list = [k.strip() for k in track_list.split(',')]
 
-        self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
-        self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger)
+        self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth)))                        
+        self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, chunk_size=512, logger=self.logger)
         self.logger.debug("SourceProcess : after connecting to stream")
         self.stream.muststop = lambda: self.stop_event.is_set()        
         
@@ -149,11 +160,14 @@
         
         session = self.session_maker()
         
+        #import pydevd
+        #pydevd.settrace(suspend=False)
+
+        
         try:
             for tweet in stream_wrapper:
                 if not self.parent_is_alive():
                     self.stop_event.set()
-                    stop_thread.join(5)
                     sys.exit()
                 self.logger.debug("SourceProcess : tweet " + repr(tweet))
                 source = TweetSource(original_json=tweet)
@@ -193,41 +207,52 @@
 
     def do_run(self):
         
-        # import pydevd
-        # pydevd.settrace(suspend=False)
+        self.logger = set_logging_process(self.options, self.logger_queue)                
         
         source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
         
         source_stream_iter_thread.start()
         
-        while not self.stop_event.is_set():
-            self.logger.debug("SourceProcess : In while after start")
-            self.stop_event.wait(DEFAULT_TIMEOUT)
-            if self.stop_event.is_set() and self.stream:
-                self.stream.close()
-            elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
-                self.stop_event.set()
+        try:
+            while not self.stop_event.is_set():
+                self.logger.debug("SourceProcess : In while after start")
+                self.stop_event.wait(DEFAULT_TIMEOUT)
+        except KeyboardInterrupt:
+            self.stop_event.set()
+            pass
 
+        if self.stop_event.is_set() and self.stream:
+            self.stream.close()
+        elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
+            self.stop_event.set()
+    
         self.logger.info("SourceProcess : join")
         source_stream_iter_thread.join(30)
 
 
-def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger):
+def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger):
     try:
         if not tweet.strip():
             return
         tweet_obj = anyjson.deserialize(tweet)
-        if 'text' not in tweet_obj:
+        processor_klass = get_processor(tweet_obj)
+        if not processor_klass:
             tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
             session.add(tweet_log)
             return
-        screen_name = ""
-        if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
-            screen_name = tweet_obj['user']['screen_name']
-        logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
-        logger.debug(u"Process_tweet :" + repr(tweet))
-        processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user)
+        processor = processor_klass(json_dict=tweet_obj,
+                                    json_txt=tweet,
+                                    source_id=source_id,
+                                    session=session,
+                                    consumer_token=consumer_token,
+                                    access_token=access_token,
+                                    token_filename=token_filename,
+                                    user_query_twitter=twitter_query_user,
+                                    logger=logger)
+        logger.info(processor.log_info())                        
+        logger.debug(u"Process_tweet :" + repr(tweet))                
         processor.process()
+        
     except ValueError as e:
         message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
         output = StringIO.StringIO()
@@ -274,8 +299,10 @@
                 except Exception as e:
                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
                     continue
-                process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger)
+                process_tweet(tweet_txt, source_id, session, self.consumer_token, self.access_token, self.twitter_query_user, self.options.token_filename, self.logger)
                 session.commit()
+        except KeyboardInterrupt:
+            self.stop_event.set()
         finally:
             session.rollback()
             session.close()
@@ -287,15 +314,20 @@
     return Session, engine, metadata
 
             
-def process_leftovers(session, access_token, twitter_query_user, logger):
+def process_leftovers(session, consumer_token, access_token, twitter_query_user, token_filename, ask_process_leftovers, logger):
     
     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
+    sources_count = sources.count()
     
+    if sources_count > 10 and ask_process_leftovers:
+        resp = raw_input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count)
+        if resp and resp.strip().lower() == "n":
+            return
+    logger.info("Process leftovers, %d tweets to process" % (sources_count))
     for src in sources:
         tweet_txt = src.original_json
-        process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger)
+        process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger)
         session.commit()
-
         
     
     # get tweet source that do not match any message
@@ -315,38 +347,36 @@
         
 def get_options():
 
-    usage = "usage: %prog [options]"
+    usage = "usage: %(prog)s [options]"
 
-    parser = OptionParser(usage=usage)
+    parser = argparse.ArgumentParser(usage=usage)
 
-    parser.add_option("-f", "--file", dest="conn_str",
-                      help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
-    parser.add_option("-u", "--user", dest="username",
-                      help="Twitter user", metavar="USER", default=None)
-    parser.add_option("-w", "--password", dest="password",
-                      help="Twitter password", metavar="PASSWORD", default=None)
-    parser.add_option("-T", "--track", dest="track",
-                      help="Twitter track", metavar="TRACK")
-    parser.add_option("-n", "--new", dest="new", action="store_true",
-                      help="new database", default=False)
-    parser.add_option("-D", "--daemon", dest="daemon", action="store_true",
-                      help="launch daemon", default=False)
-    parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
-                      help="Token file name")
-    parser.add_option("-d", "--duration", dest="duration",
-                      help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
-    parser.add_option("-N", "--nb-process", dest="process_nb",
-                      help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
-    parser.add_option("--url", dest="url",
-                      help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url)
-    parser.add_option("--query-user", dest="twitter_query_user", action="store_true",
-                      help="Query twitter for users", default=False, metavar="QUERY_USER")
-    parser.add_option("--catchup", dest="catchup",
-                      help="catchup count for tweets", default=None, metavar="CATCHUP", type='int')
-    parser.add_option("--timeout", dest="timeout",
-                      help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int')
-    
-
+    parser.add_argument("-f", "--file", dest="conn_str",
+                        help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
+    parser.add_argument("-T", "--track", dest="track",
+                        help="Twitter track", metavar="TRACK")
+    parser.add_argument("-k", "--key", dest="consumer_key",
+                        help="Twitter consumer key", metavar="CONSUMER_KEY", required=True)
+    parser.add_argument("-s", "--secret", dest="consumer_secret",
+                        help="Twitter consumer secret", metavar="CONSUMER_SECRET", required=True)
+    parser.add_argument("-n", "--new", dest="new", action="store_true",
+                        help="new database", default=False)
+    parser.add_argument("-D", "--daemon", dest="daemon", action="store_true",
+                        help="launch daemon", default=False)
+    parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
+                        help="Token file name")
+    parser.add_argument("-d", "--duration", dest="duration",
+                        help="Duration of recording in seconds", metavar="DURATION", default= -1, type=int)
+    parser.add_argument("-N", "--nb-process", dest="process_nb",
+                        help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type=int)
+    parser.add_argument("--url", dest="url",
+                        help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url)
+    parser.add_argument("--query-user", dest="twitter_query_user", action="store_true",
+                        help="Query twitter for users", default=False)
+    parser.add_argument("--timeout", dest="timeout",
+                        help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type=int)
+    parser.add_argument("--ask-process-leftovers", dest="ask_process_leftovers", action="store_false",
+                        help="ask process leftover", default=True)
 
 
     utils.set_logging_options(parser)
@@ -357,14 +387,14 @@
 def do_run(options, session_maker):
 
     stop_args = {}
-
-    access_token = None
-    if not options.username or not options.password:
-        access_token = utils.get_oauth_token(options.token_filename)
+    
+    consumer_token = (options.consumer_key, options.consumer_secret)
+    access_token = utils.get_oauth_token(consumer_key=consumer_token[0], consumer_secret=consumer_token[1], token_file_path=options.token_filename)
+    
     
     session = session_maker()
     try:
-        process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
+        process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
         session.commit()
     finally:
         session.rollback()
@@ -378,7 +408,10 @@
     stop_event = Event()
     
     # workaround for bug on using urllib2 and multiprocessing
-    req = urllib2.Request('http://localhost')
+    httpd = HTTPServer(('127.0.0.1',0), Requesthandler)
+    thread.start_new_thread(httpd.handle_request, ())
+    
+    req = urllib2.Request('http://localhost:%d' % httpd.server_port)
     conn = None
     try:
         conn = urllib2.urlopen(req)
@@ -392,7 +425,7 @@
     process_engines = []
     logger_queues = []
     
-    SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+    SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
     process_engines.append(engine_process)
     lqueue = mQueue(50)
     logger_queues.append(lqueue)
@@ -402,7 +435,7 @@
     tweet_processes = []
     
     for i in range(options.process_nb - 1):
-        SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+        SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
         process_engines.append(engine_process)
         lqueue = mQueue(50)
         logger_queues.append(lqueue)
@@ -462,21 +495,13 @@
             cprocess.terminate()
 
     
-    utils.get_logger().debug("Close queues")
-    try:
-        queue.close()
-        for lqueue in logger_queues:
-            lqueue.close()
-    except exception as e:
-        utils.get_logger().error("error when closing queues %s", repr(e))
-        # do nothing
-        
+    utils.get_logger().debug("Close queues")        
     
     if options.process_nb > 1:
         utils.get_logger().debug("Processing leftovers")
         session = session_maker()
         try:
-            process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
+            process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
             session.commit()
         finally:
             session.rollback()
@@ -484,11 +509,19 @@
 
     for pengine in process_engines:
         pengine.dispose()
+    
+    try:
+        queue.close()
+        for lqueue in logger_queues:
+            lqueue.close()
+    except Exception as e:
+        utils.get_logger().error("error when closing queues %s", repr(e))
+        # do nothing
 
     return stop_args
 
 
-def main(options, args):
+def main(options):
     
     global conn_str
     
@@ -513,7 +546,8 @@
     Session, engine, metadata = get_sessionmaker(conn_str)
     
     if options.new:
-        check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
+        check_metadata = sqlalchemy.schema.MetaData(bind=engine)
+        check_metadata.reflect()
         if len(check_metadata.sorted_tables) > 0:
             message = "Database %s not empty exiting" % conn_str
             utils.get_logger().error(message)
@@ -528,7 +562,7 @@
     
     stop_args = {}
     try:
-        add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
+        add_process_event(event_type="start", args={'options':options.__dict__, 'args': [], 'command_line': sys.argv}, session_maker=Session)
         stop_args = do_run(options, Session)
     except Exception as e:
         utils.get_logger().exception("Error in main thread")        
@@ -540,7 +574,7 @@
             outfile.close()
         raise
     finally:    
-        add_process_event(type="shutdown", args=stop_args, session_maker=Session)
+        add_process_event(event_type="shutdown", args=stop_args, session_maker=Session)
 
     utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
 
@@ -548,15 +582,15 @@
 
 if __name__ == '__main__':
 
-    (options, args) = get_options()
+    options = get_options()
     
     loggers = set_logging(options)
     
     utils.get_logger().debug("OPTIONS : " + repr(options))
     
     if options.daemon:
+        options.ask_process_leftovers = False
         import daemon
-        import lockfile
         
         hdlr_preserve = []
         for logger in loggers:
@@ -564,7 +598,7 @@
             
         context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) 
         with context:
-            main(options, args)
+            main(options)
     else:
-        main(options, args)
-
+        main(options)
+    
--- a/script/utils/export_twitter_alchemy.py	Sun Apr 21 21:55:06 2013 +0200
+++ b/script/utils/export_twitter_alchemy.py	Tue May 07 18:57:54 2013 +0200
@@ -416,7 +416,7 @@
                     get_logger().debug("write http " + repr(project)) #@UndefinedVariable
                     r = requests.put(content_file_write, data=anyjson.dumps(project), headers={'content-type':'application/json'}, params=post_param);
                     get_logger().debug("write http " + repr(r) + " content " + r.text) #@UndefinedVariable
-                    if r.status_code != requests.codes.ok:
+                    if r.status_code != requests.codes.ok:  # @UndefinedVariable
                         r.raise_for_status()
                 else:
                     if content_file_write and os.path.exists(content_file_write):
--- a/script/utils/merge_tweets.py	Sun Apr 21 21:55:06 2013 +0200
+++ b/script/utils/merge_tweets.py	Tue May 07 18:57:54 2013 +0200
@@ -1,12 +1,15 @@
 #from models import setup_database
 from iri_tweet.models import setup_database, TweetSource, Tweet, TweetLog
-from iri_tweet.utils import TwitterProcessor, get_oauth_token, show_progress
+from iri_tweet.processor import TwitterProcessorStatus
+from iri_tweet.utils import get_oauth_token, show_progress
+import anyjson
 import argparse
-import sys
+import codecs
+import logging
 import re
-import anyjson
-import math
-import codecs
+import sys
+
+logger = logging.getLogger(__name__)
 
 def get_option():
     
@@ -16,6 +19,10 @@
                         help="log to file", metavar="LOG", default="stderr")
     parser.add_argument("-v", dest="verbose", action="count",
                         help="verbose", default=0)
+    parser.add_option("-k", "--key", dest="consumer_key",
+                      help="Twitter consumer key", metavar="CONSUMER_KEY")
+    parser.add_option("-s", "--secret", dest="consumer_secret",
+                      help="Twitter consumer secret", metavar="CONSUMER_SECRET")
     parser.add_argument("-q", dest="quiet", action="count",
                         help="quiet", default=0)
     parser.add_argument("--query-user", dest="query_user", action="store_true",
@@ -38,7 +45,7 @@
     
     access_token = None
     if options.query_user:
-        access_token = get_oauth_token(options.token_filename)
+        access_token = get_oauth_token(options.consumer_key, options.consumer_secret, options.token_filename)
     
     #open source
     src_conn_str = options.source[0].strip()
@@ -60,7 +67,7 @@
         session_src = Session_src()
         session_tgt = Session_tgt()
         
-        count_tw_query = Tweet.__table__.count()
+        count_tw_query = Tweet.__table__.count()  # @UndefinedVariable
         
         count_tw = engine_src.scalar(count_tw_query)
         
@@ -83,10 +90,10 @@
                                 
                 tweet_obj = anyjson.deserialize(tweet_source)
                 if 'text' not in tweet_obj:
-                    tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
+                    tweet_log = TweetLog(tweet_source_id=tweet.tweet_source.id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
                     session_tgt.add(tweet_log)
                 else:                
-                    tp = TwitterProcessor(None, tweet_source, None, session_tgt, access_token, options.token_filename, user_query_twitter=options.query_user)
+                    tp = TwitterProcessorStatus(None, tweet_source, None, session_tgt, access_token, options.token_filename, user_query_twitter=options.query_user, logger=logger)
                     tp.process()
                 
                 session_tgt.flush()
@@ -98,9 +105,13 @@
         print u"%d new tweet added" % (added)
         
     finally:
-        session_tgt.close() if session_tgt is not None else None
-        session_src.close() if session_src is not None else None
-        conn_tgt.close() if conn_tgt is not None else None
-        conn_src.close() if conn_src is not None else None
+        if session_tgt is not None:
+            session_tgt.close()
+        if session_src is not None:
+            session_src.close()
+        if conn_tgt is not None:
+            conn_tgt.close()
+        if conn_src is not None:
+            conn_src.close()
         
         
\ No newline at end of file
--- a/script/utils/search_topsy.py	Sun Apr 21 21:55:06 2013 +0200
+++ b/script/utils/search_topsy.py	Tue May 07 18:57:54 2013 +0200
@@ -1,17 +1,15 @@
+from blessings import Terminal
 from iri_tweet import models, utils
-from sqlalchemy.orm import sessionmaker
-import anyjson
-import sqlite3
-import twitter
+from iri_tweet.processor import TwitterProcessorStatus
+from optparse import OptionParser
+import logging
+import math
 import re
 import requests
-from optparse import OptionParser
-import simplejson
 import time
-from blessings import Terminal
-import sys
-import math
-from symbol import except_clause
+import twitter
+
+logger = logging.getLogger(__name__)
 
 APPLICATION_NAME = "Tweet recorder user"
 CONSUMER_KEY = "Vdr5ZcsjI1G3esTPI8yDg"
@@ -86,7 +84,7 @@
     utils.set_logging(options);
 
 
-    acess_token_key, access_token_secret = utils.get_oauth_token(options.token_filename, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET)
+    acess_token_key, access_token_secret = utils.get_oauth_token(consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET, options.token_filename, application_name=APPLICATION_NAME)
 
     t = twitter.Twitter(domain="api.twitter.com", auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET), secure=True)
     t.secure = True
@@ -144,7 +142,7 @@
                 else:
                     raise
             
-            processor = utils.TwitterProcessor(tweet, None, None, session, None, options.token_filename)
+            processor = TwitterProcessorStatus(tweet, None, None, session, None, options.token_filename, logger)
             processor.process()
             session.flush()
             session.commit()
--- a/script/utils/tweet_twitter_user.py	Sun Apr 21 21:55:06 2013 +0200
+++ b/script/utils/tweet_twitter_user.py	Tue May 07 18:57:54 2013 +0200
@@ -104,7 +104,7 @@
                 
             query_res = query.all()
             
-            acess_token_key, access_token_secret = get_oauth_token(options.token_filename, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET)
+            acess_token_key, access_token_secret = get_oauth_token(consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET, token_file_path=options.token_filename, application_name=APPLICATION_NAME)
             t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET))
 
             for user in query_res:
--- a/script/virtualenv/res/lib/lib_create_env.py	Sun Apr 21 21:55:06 2013 +0200
+++ b/script/virtualenv/res/lib/lib_create_env.py	Tue May 07 18:57:54 2013 +0200
@@ -25,12 +25,13 @@
     'DATEUTIL': {'setup': 'python-dateutil', 'url':'http://pypi.python.org/packages/source/p/python-dateutil/python-dateutil-2.1.tar.gz', 'local':"python-dateutil-2.1.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
     'PYTZ': {'setup': 'pytz', 'url':'http://pypi.python.org/packages/source/p/pytz/pytz-2013b.tar.bz2', 'local':"pytz-2013b.tar.bz2", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
     'SIMPLEJSON': {'setup': 'simplejson', 'url':'http://pypi.python.org/packages/source/s/simplejson/simplejson-3.1.3.tar.gz', 'local':"simplejson-3.1.3.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
-    'SQLALCHEMY': {'setup': 'sqlalchemy', 'url':'http://www.python.org/pypi/SQLAlchemy/0.8.0', 'local':"SQLAlchemy-0.8.0.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
+    'SQLALCHEMY': {'setup': 'sqlalchemy', 'url':'http://www.python.org/pypi/SQLAlchemy/0.8.1', 'local':"SQLAlchemy-0.8.1.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
     'TWITTER': {'setup': 'twitter', 'url':'http://pypi.python.org/packages/source/t/twitter/twitter-1.9.2.tar.gz', 'local':"twitter-1.9.2.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
     'TWITTER-TEXT': {'setup': 'twitter-text', 'url':'https://github.com/dryan/twitter-text-py/archive/master.tar.gz', 'local':"twitter-text-1.0.4.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
     'REQUESTS': {'setup': 'requests', 'url':'https://github.com/kennethreitz/requests/archive/v1.2.0.tar.gz', 'local':'requests-v1.2.0.tar.gz', 'install' : {'method':'pip', 'option_str': None, 'dict_extra_env': None}},
     'OAUTHLIB': {'setup': 'oauthlib', 'url':'https://github.com/idan/oauthlib/archive/0.4.0.tar.gz', 'local':'oauthlib-0.4.0.tar.gz', 'install' : {'method':'pip', 'option_str': None, 'dict_extra_env': None}},
     'REQUESTS-OAUTHLIB': {'setup': 'requests-oauthlib', 'url':'https://github.com/requests/requests-oauthlib/archive/master.tar.gz', 'local':'requests-oauthlib-0.3.0.tar.gz', 'install' : {'method':'pip', 'option_str': None, 'dict_extra_env': None}},
+    'BLESSINGS': {'setup': 'blessings', 'url':'https://github.com/erikrose/blessings/archive/1.5.tar.gz', 'local':'blessings-1.5.tar.gz', 'install' :  {'method':'pip', 'option_str': None, 'dict_extra_env': None}}
 }
 
 if system_str == 'Windows':
Binary file script/virtualenv/res/src/SQLAlchemy-0.8.0.tar.gz has changed
Binary file script/virtualenv/res/src/SQLAlchemy-0.8.1.tar.gz has changed
Binary file script/virtualenv/res/src/blessings-1.5.tar.gz has changed
--- a/script/virtualenv/script/res/res_create_env.py	Sun Apr 21 21:55:06 2013 +0200
+++ b/script/virtualenv/script/res/res_create_env.py	Tue May 07 18:57:54 2013 +0200
@@ -21,7 +21,8 @@
     'TWITTER-TEXT',
     'REQUESTS',
     'OAUTHLIB',
-    'REQUESTS-OAUTHLIB'
+    'REQUESTS-OAUTHLIB',
+    'BLESSINGS'
 ]
 
 if system_str == "Linux":