Adapt recorder_stream to python 3
authorymh <ymh.work@gmail.com>
Thu, 10 Jan 2019 18:36:36 +0100
changeset 1497 14a9bed2e3cd
parent 1496 184372ec27e2
child 1498 e0b3ef3c07d0
Adapt recorder_stream to python 3 Improve twitter authentication management Use Oauth2 where possible Delete old script
script/lib/iri_tweet/iri_tweet/processor.py
script/lib/iri_tweet/iri_tweet/stream.py
script/lib/iri_tweet/iri_tweet/utils.py
script/stream/recorder_stream.py
script/utils/merge_tweets.py
script/utils/search_topsy.py
script/utils/search_topsy_scrap.py
script/utils/search_twitter_api.py
script/utils/search_twitter_json.py
script/virtualenv/script/res/requirement.txt
web/images/polemictweet_square.png
--- a/script/lib/iri_tweet/iri_tweet/processor.py	Wed Jan 02 17:49:19 2019 +0100
+++ b/script/lib/iri_tweet/iri_tweet/processor.py	Thu Jan 10 18:36:36 2019 +0100
@@ -4,10 +4,10 @@
 
 @author: ymh
 '''
-from iri_tweet.models import (User, EntityType, adapt_json, MediaType, Media, 
-    EntityMedia, Hashtag, EntityHashtag, EntityUser, EntityUrl, Url, Entity, Tweet, 
+from iri_tweet.models import (User, EntityType, adapt_json, MediaType, Media,
+    EntityMedia, Hashtag, EntityHashtag, EntityUser, EntityUrl, Url, Entity, Tweet,
     TweetSource, TweetLog)
-from iri_tweet.utils import (ObjectsBuffer, adapt_fields, fields_adapter, 
+from iri_tweet.utils import (ObjectsBuffer, adapt_fields, fields_adapter,
     ObjectBufferProxy, get_oauth_token, clean_keys)
 from sqlalchemy.orm import joinedload
 import logging
@@ -20,30 +20,27 @@
     pass
 
 class TwitterProcessor(object):
-    def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None):
+    def __init__(self, json_dict, json_txt, source_id, session, twitter_auth=None, user_query_twitter=False, logger=None):
 
         if json_dict is None and json_txt is None:
             raise TwitterProcessorException("No json")
-        
+
         if json_dict is None:
             self.json_dict = json.loads(json_txt)
         else:
             self.json_dict = json_dict
-        
+
         if not json_txt:
             self.json_txt = json.dumps(json_dict)
         else:
             self.json_txt = json_txt
-        
+
         if "id" not in self.json_dict:
             raise TwitterProcessorException("No id in json")
-        
+
         self.source_id = source_id
         self.session = session
-        self.consumer_key = consumer_token[0] if consumer_token else None
-        self.consumer_secret = consumer_token[1] if consumer_token else None
-        self.token_filename = token_filename
-        self.access_token = access_token
+        self.twitter_auth = twitter_auth
         self.obj_buffer = ObjectsBuffer()
         self.user_query_twitter = user_query_twitter
         if not logger:
@@ -57,27 +54,26 @@
             self.source_id = tweet_source.id
         self.process_source()
         self.obj_buffer.persists(self.session)
-        
+
     def process_source(self):
         raise NotImplementedError()
-    
+
     def log_info(self):
-        return "Process tweet %s" %  repr(self.__class__)    
+        return "Process tweet %s" %  repr(self.__class__)
 
 
 class TwitterProcessorStatus(TwitterProcessor):
-    
-    def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None):
-        TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger)
-        
+
     def __get_user(self, user_dict, do_merge):
         self.logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
-        
+
         user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
-    
-        user_id = user_dict.get("id",None)    
+
+        user_id = user_dict.get("id", user_dict.get("id_str",None))
+        if user_id is not None:
+            user_id = int(user_id)
         user_name = user_dict.get("screen_name", user_dict.get("name", None))
-        
+
         if user_id is None and user_name is None:
             return None
 
@@ -87,7 +83,7 @@
         else:
             user = self.obj_buffer.get(User, screen_name=user_name)
 
-        #to do update user id needed            
+        #to do update user id needed
         if user is not None:
             user_created_at = None
             if user.args is not None:
@@ -99,31 +95,26 @@
                     user.args.update(user_dict)
             return user
 
-        #todo : add methpds to objectbuffer to get buffer user
+        #todo : add methods to objectbuffer to get buffer user
         user_obj = None
         if user_id:
             user_obj = self.session.query(User).filter(User.id == user_id).first()
         else:
             user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first()
-    
+
         #todo update user if needed
-        if user_obj is not None:            
+        if user_obj is not None:
             if user_obj.created_at is not None or user_dict.get('created_at', None) is None or not do_merge :
                 user = ObjectBufferProxy(User, None, None, False, user_obj)
             else:
                 user = self.obj_buffer.add_object(User, None, user_dict, True, user_obj)
             return user
-    
+
         user_created_at = user_dict.get("created_at", None)
-        
+
         if user_created_at is None and self.user_query_twitter:
-            
-            if self.access_token is not None:
-                acess_token_key, access_token_secret = self.access_token
-            else:
-                acess_token_key, access_token_secret = get_oauth_token(consumer_key=self.consumer_key, consumer_secret=self.consumer_secret, token_file_path=self.token_filename)
-            #TODO pass it as argument    
-            t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, self.consumer_key, self.consumer_secret))
+
+            t = twitter.Twitter(auth=self.twitter_auth)
             try:
                 if user_id:
                     user_dict = t.users.show(user_id=user_id)
@@ -133,20 +124,20 @@
                 self.logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
                 self.logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
                 return None
-            
-        if "id" not in user_dict:
+
+        if "id" not in user_dict or not user_dict['id']:
             return None
-        
+
         #TODO filter get, wrap in proxy
         user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first()
-        
+
         if user_obj is not None and not do_merge:
             return ObjectBufferProxy(User, None, None, False, user_obj)
-        else:        
-            return self.obj_buffer.add_object(User, None, user_dict, True)        
+        else:
+            return self.obj_buffer.add_object(User, None, user_dict, True)
 
     def __get_or_create_object(self, klass, filter_by_kwargs, filter_arg, creation_kwargs, must_flush, do_merge):
-        
+
         obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs)
         if obj_proxy is None:
             query = self.session.query(klass)
@@ -167,11 +158,11 @@
 
     def __process_entity(self, ind, ind_type):
         self.logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
-        
+
         ind = clean_keys(ind)
-        
+
         entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False)
-        
+
         entity_dict = {
            "indice_start"   : ind["indices"][0],
            "indice_end"     : ind["indices"][1],
@@ -181,19 +172,19 @@
         }
 
         def process_medias():
-            
+
             media_id = ind.get('id', None)
             if media_id is None:
                 return None, None
-            
+
             type_str = ind.get("type", "photo")
             media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False)
             media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"])
             if "type" in media_ind:
                 del(media_ind["type"])
-            media_ind['type_id'] = media_type.id            
+            media_ind['type_id'] = media_type.id
             media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False)
-            
+
             entity_dict['media_id'] = media.id
             return EntityMedia, entity_dict
 
@@ -204,8 +195,8 @@
             ind['text'] = text
             hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False)
             entity_dict['hashtag_id'] = hashtag.id
-            return EntityHashtag, entity_dict             
-        
+            return EntityHashtag, entity_dict
+
         def process_user_mentions():
             user_mention = self.__get_user(ind, False)
             if user_mention is None:
@@ -213,71 +204,79 @@
             else:
                 entity_dict['user_id'] = user_mention.id
             return EntityUser, entity_dict
-        
+
         def process_urls():
             url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False)
             entity_dict['url_id'] = url.id
             return EntityUrl, entity_dict
-                
+
         #{'': lambda }
-        entity_klass, entity_dict =  { 
+        entity_klass, entity_dict =  {
             'hashtags': process_hashtags,
             'user_mentions' : process_user_mentions,
             'urls' : process_urls,
             'media': process_medias,
             }.get(ind_type, lambda: (Entity, entity_dict))()
-            
+
         self.logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
         if entity_klass:
             self.obj_buffer.add_object(entity_klass, None, entity_dict, False)
 
 
     def __process_twitter(self):
-        
+
         tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
         if tweet_nb > 0:
             return
-        
-        ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
-        
+
+        ts_copy = None
+        # Takes into account the extended tweets
+        if self.json_dict.get('truncated') and self.json_dict.get('extended_tweet'):
+            ts_copy = { **self.json_dict, **self.json_dict['extended_tweet'], "text": self.json_dict['extended_tweet']['full_text'] }
+        else:
+            ts_copy = { **self.json_dict }
+
+
+        ts_copy = adapt_fields(ts_copy, fields_adapter["stream"]["tweet"])
+
         # get or create user
-        user = self.__get_user(self.json_dict["user"], True)
+        user = self.__get_user(ts_copy["user"], True)
         if user is None:
-            self.logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
+            self.logger.warning("USER not found " + repr(ts_copy["user"])) #@UndefinedVariable
             ts_copy["user_id"] = None
         else:
             ts_copy["user_id"] = user.id
-            
+
         del(ts_copy['user'])
         ts_copy["tweet_source_id"] = self.source_id
-        
+
         self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True)
-            
-        self.__process_entities()
+
+        self.__process_entities(ts_copy)
 
 
-    def __process_entities(self):
-        if "entities" in self.json_dict:
-            for ind_type, entity_list in self.json_dict["entities"].items():
+    def __process_entities(self, json_dict):
+        if "entities" in json_dict:
+            for ind_type, entity_list in json_dict["entities"].items():
                 for ind in entity_list:
                     self.__process_entity(ind, ind_type)
         else:
-            
+
             text = self.tweet.text
             extractor = twitter_text.Extractor(text)
             for ind in extractor.extract_hashtags_with_indices():
                 self.__process_entity(ind, "hashtags")
-            
+
             for ind in extractor.extract_urls_with_indices():
                 self.__process_entity(ind, "urls")
-            
+
             for ind in extractor.extract_mentioned_screen_names_with_indices():
                 self.__process_entity(ind, "user_mentions")
 
 
 
     def process_source(self):
-                
+
         status_id = self.json_dict["id"]
         log = self.session.query(TweetLog).filter(TweetLog.status_id==status_id).first()
         if(log):
@@ -307,24 +306,22 @@
       }
     }
     """
-    def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None):
-        TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger)
 
     def process(self):
-                   
+
         #find tweet
         tweet_id = self.json_dict.get('delete',{}).get('status',{}).get('id',None)
         if tweet_id:
             t = self.session.query(Tweet).options(joinedload(Tweet.tweet_source)).filter(Tweet.id == tweet_id).first()
             if t:
-                tsource = t.tweet_source                
+                tsource = t.tweet_source
                 self.session.delete(t)
                 self.session.query(TweetLog).filter(TweetLog.tweet_source_id == tsource.id).delete()
                 self.session.delete(tsource)
                 self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['DELETE']}, True)
             else:
                 self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status_id': tweet_id,'status':TweetLog.TWEET_STATUS['DELETE_PENDING']}, True)
-                
+
     def log_info(self):
         status_del = self.json_dict.get('delete', {}).get("status",{})
         return u"Process delete for %s : %s" % (status_del.get('user_id_str',u""), status_del.get('id_str',u""))
@@ -341,10 +338,7 @@
     }
     """
 
-    def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None):
-        TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger)
-    
-    def process_source(self):        
+    def process_source(self):
         up_to_status_id = self.json_dict.get("scrub_geo", {}).get("up_to_status_id", None)
         if not up_to_status_id:
             return
@@ -357,7 +351,7 @@
                 tsource_dict["geo"] = None
                 self.obj_buffer.add_object(TweetSource, tsource, {'original_json': json.dumps(tsource_dict)}, False)
         self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['SCRUB_GEO']}, True)
-    
+
     def log_info(self):
         return u"Process scrub geo for %s : %s" % (self.json_dict["scrub_geo"].get('user_id_str',u""), self.json_dict["scrub_geo"].get('id_str',u""))
 
@@ -369,19 +363,17 @@
         "track":1234
       }
     }
-    """    
-    def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None):
-        TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger)
+    """
 
     def process_source(self):
         """
         do nothing, just log the information
-        """    
+        """
         self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['LIMIT'], 'error':self.json_txt}, True)
-        
+
     def log_info(self):
         return u"Process limit %d " % self.json_dict.get("limit", {}).get('track', 0)
-        
+
 class TwitterProcessorStatusWithheld(TwitterProcessor):
     """
     {
@@ -392,31 +384,27 @@
       }
     }
     """
-    def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None):
-        TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger)
-    
+
     def process_source(self):
         """
         do nothing, just log the information
         """
         self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['STATUS_WITHHELD'], 'error':self.json_txt}, True)
-        
+
     def log_info(self):
         status_withheld = self.json_dict.get("status_withheld",{})
         return u"Process status withheld status id %d from user %d in countries %s" %(status_withheld.get("id",0), status_withheld.get("user_id",0), u",".join(status_withheld.get("withheld_in_countries",[])))
 
 class TwitterProcessorUserWithheld(TwitterProcessor):
     """
-    {  
+    {
       "user_withheld":{
         "id":123456,
         "withheld_in_countries":["DE","AR"]
       }
     }
     """
-    def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None):
-        TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger)
-    
+
     def process_source(self):
         """
         do nothing, just log the information
@@ -438,9 +426,7 @@
       }
     }
     """
-    def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None):
-        TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger)
-    
+
     def process_source(self):
         """
         do nothing, just log the information
@@ -461,8 +447,6 @@
       }
     }
     """
-    def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None):
-        TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger)
 
     def process_source(self):
         """
@@ -482,7 +466,7 @@
     'status_withheld': TwitterProcessorStatusWithheld,
     'user_withheld': TwitterProcessorUserWithheld,
     'disconnect': TwitterProcessorDisconnect,
-    'warning': TwitterProcessorStallWarning 
+    'warning': TwitterProcessorStallWarning
 }
 
 def get_processor(tweet_dict):
--- a/script/lib/iri_tweet/iri_tweet/stream.py	Wed Jan 02 17:49:19 2019 +0100
+++ b/script/lib/iri_tweet/iri_tweet/stream.py	Thu Jan 10 18:36:36 2019 +0100
@@ -7,62 +7,15 @@
 Module directly inspired by tweetstream
 
 '''
+import json
+import select
 import time
+
 import requests
 from requests.utils import stream_decode_response_unicode
-import anyjson
-import select
-
-from . import  USER_AGENT, ConnectionError, AuthenticationError
-
 
-def iter_content_non_blocking(req, max_chunk_size=4096, decode_unicode=False, timeout=-1):
-    
-    if req._content_consumed:
-        raise RuntimeError(
-            'The content for this response was already consumed'
-        )
-    
-    req.raw._fp.fp._sock.setblocking(False)
-    
-    def generate():
-        chunk_size = 1        
-        while True:
-            if timeout < 0:
-                rlist,_,_ = select.select([req.raw._fp.fp._sock], [], [])
-            else:
-                rlist,_,_ = select.select([req.raw._fp.fp._sock], [], [], timeout)
-                
-            if not rlist:                 
-                continue
-            
-            try:
-                chunk = req.raw.read(chunk_size, decode_content=True)            
-                if not chunk:
-                    break
-                if len(chunk) >= chunk_size and chunk_size < max_chunk_size:
-                    chunk_size = min(chunk_size*2, max_chunk_size)
-                elif len(chunk) < chunk_size/2 and chunk_size < max_chunk_size:
-                    chunk_size = max(chunk_size/2,1)
-                yield chunk
-            except requests.exceptions.SSLError as e:
-                if e.errno == 2:
-                    # Apparently this means there was nothing in the socket buf
-                    pass
-                else:
-                    raise                
-            
-        req._content_consumed = True
+from . import USER_AGENT, AuthenticationError, ConnectionError
 
-    gen = generate()
-
-    if decode_unicode:
-        gen = stream_decode_response_unicode(gen, req)
-
-    return gen
-
-    
-    
 
 class BaseStream(object):
 
@@ -121,7 +74,7 @@
     """
 
     def __init__(self, auth,
-                  raw=False, timeout=-1, url=None, compressed=False, chunk_size=4096, logger=None):
+                  raw=False, timeout=-1, url=None, compressed=False, logger=None):
         self._conn = None
         self._rate_ts = None
         self._rate_cnt = 0
@@ -136,14 +89,13 @@
         self.count = 0
         self.rate = 0
         self.user_agent = USER_AGENT
-        self.chunk_size = chunk_size
         if url: self.url = url
-        
+
         self.muststop = False
         self._logger = logger
-        
+
         self._iter = self.__iter__()
-         
+
 
     def __enter__(self):
         return self
@@ -154,24 +106,24 @@
 
     def _init_conn(self):
         """Open the connection to the twitter server"""
-        
+
         if self._logger : self._logger.debug("BaseStream Open the connection to the twitter server")
-        
+
         headers = {'User-Agent': self.user_agent}
-        
+
         if self._compressed:
             headers['Accept-Encoding'] = "deflate, gzip"
 
         postdata = self._get_post_data() or {}
         postdata['stall_warnings'] = 'true'
-        
+
         if self._logger : self._logger.debug("BaseStream init connection url " + repr(self.url))
         if self._logger : self._logger.debug("BaseStream init connection headers " + repr(headers))
         if self._logger : self._logger.debug("BaseStream init connection data " + repr(postdata))
-        
+
         self._resp = requests.post(self.url, auth=self._auth, headers=headers, data=postdata, stream=True)
         if self._logger : self._logger.debug("BaseStream init connection " + repr(self._resp))
-        
+
         self._resp.raise_for_status()
         self.connected = True
 
@@ -189,110 +141,59 @@
 
     def testmuststop(self):
         if callable(self.muststop):
-            return self.muststop()
+            return self.muststop() # pylint: disable=not-callable
         else:
             return self.muststop
-    
+
     def _update_rate(self):
         rate_time = time.time() - self._rate_ts
         if not self._rate_ts or rate_time > self.rate_period:
             self.rate = self._rate_cnt / rate_time
             self._rate_cnt = 0
             self._rate_ts = time.time()
-            
-    def _iter_object(self):
-   
-#        for line in self._resp.iter_lines():
-#            yield line     
-#         pending = None
-# 
-#         for chunk in self._resp.iter_content(chunk_size=self.chunk_size, decode_unicode=None):
-# 
-#             if pending is not None:
-#                 chunk = pending + chunk 
-#             lines = chunk.splitlines()
-# 
-#             if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
-#                 pending = lines.pop()
-#             else:
-#                 pending = None
-# 
-#             for line in lines:
-#                 yield line
-# 
-#         if pending is not None:
-#             yield pending
-            
-        pending = None
-        has_stopped = False
-         
-        if self._logger : self._logger.debug("BaseStream _iter_object")
- 
-        for chunk in self._resp.iter_content(
-            chunk_size=self.chunk_size,
-            decode_unicode=True):
- 
-            if self._logger : self._logger.debug("BaseStream _iter_object loop")
-            if self.testmuststop():
-                has_stopped = True
-                break
- 
-            if pending is not None:
-                chunk = pending + chunk
-            lines = chunk.split('\r')
- 
-            if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
-                pending = lines.pop()
-            else:
-                pending = None
- 
-            for line in lines:
-                yield line.strip('\n')
- 
-            if self.testmuststop():
-                has_stopped = True
-                break
- 
-        if pending is not None:
-            yield pending
-        if has_stopped:
-            raise StopIteration()
-            
+
     def __iter__(self):
-        
+
         if self._logger : self._logger.debug("BaseStream __iter__")
         if not self.connected:
             if self._logger : self._logger.debug("BaseStream __iter__ not connected, connecting")
             self._init_conn()
 
         if self._logger : self._logger.debug("BaseStream __iter__ connected")
-        
-        for line in self._iter_object():
+        has_stopped = False
 
-            if self._logger : self._logger.debug("BaseStream __iter__ line %s " % repr(line))
-            
+        # for line in self._iter_object():
+        for line in self._resp.iter_lines():
+
             if not line:
                 continue
 
+            if self.testmuststop():
+                has_stopped = True
+                break
+
+
+            if self._logger : self._logger.debug("BaseStream __iter__ line %s " % repr(line))
+
+            text_in_tweet = False
             if (self.raw_mode):
                 tweet = line
+                text_in_tweet = b'text' in tweet
             else:
-                line = line.decode("utf8")
                 try:
-                    tweet = anyjson.deserialize(line)
+                    tweet = json.loads(line)
                 except ValueError:
                     self.close()
                     raise ConnectionError("Got invalid data from twitter", details=line)
-            if 'text' in tweet:
+                text_in_tweet = 'text' in tweet
+            if text_in_tweet:
                 self.count += 1
                 self._rate_cnt += 1
             self._update_rate()
             yield tweet
 
-
-    def next(self):
-        """Return the next available tweet. This call is blocking!"""
-        return self._iter.next()
+        if has_stopped:
+            raise StopIteration()
 
 
     def close(self):
@@ -306,12 +207,12 @@
     url = "https://stream.twitter.com/1.1/statuses/filter.json"
 
     def __init__(self, auth, follow=None, locations=None,
-                 track=None, url=None, raw=False, timeout=None, compressed=False, chunk_size=requests.models.ITER_CHUNK_SIZE, logger=None):
+                 track=None, url=None, raw=False, timeout=None, compressed=False, logger=None):
         self._follow = follow
         self._locations = locations
         self._track = track
         # remove follow, locations, track
-        BaseStream.__init__(self, auth, url=url, raw=raw, timeout=timeout, compressed=compressed, chunk_size=chunk_size, logger=logger)
+        BaseStream.__init__(self, auth, url=url, raw=raw, timeout=timeout, compressed=compressed, logger=logger)
 
     def _get_post_data(self):
         postdata = {}
@@ -322,7 +223,7 @@
 
 
 class SafeStreamWrapper(object):
-    
+
     def __init__(self, base_stream, logger=None, error_cb=None, max_reconnects=-1, initial_tcp_wait=250, initial_http_wait=5000, max_wait=240000):
         self._stream = base_stream
         self._logger = logger
@@ -342,12 +243,12 @@
             self._error_cb(e)
         if self._logger: self._logger.info("stream sleeping for %d ms " % self._retry_wait)
         time.sleep(float(self._retry_wait)/1000.0)
-        
-        
+
+
     def __process_tcp_error(self,e):
         if self._logger: self._logger.debug("connection error type :" + repr(type(e)))
         if self._logger: self._logger.debug("connection error :" + repr(e))
-        
+
         self._reconnects += 1
         if self._max_reconnects >= 0 and self._reconnects > self._max_reconnects:
             raise ConnectionError("Too many retries")
@@ -355,10 +256,10 @@
             self._retry_wait += self._initial_tcp_wait
             if self._retry_wait > self._max_wait:
                 self._retry_wait = self._max_wait
-        
+
         self.__post_process_error(e)
 
-        
+
     def __process_http_error(self,e):
         if self._logger: self._logger.debug("http error type %s" % (repr(type(e))))
         if self._logger: self._logger.debug("http error on %s : %s" % (e.response.url,e.message))
@@ -367,9 +268,9 @@
             self._retry_wait = 2*self._retry_wait if self._retry_wait > 0 else self._initial_http_wait
             if self._retry_wait > self._max_wait:
                 self._retry_wait = self._max_wait
-        
+
         self.__post_process_error(e)
-        
+
     def __iter__(self):
         while not self._stream.testmuststop():
             self._retry_nb += 1
@@ -385,15 +286,11 @@
                     yield tweet
             except requests.exceptions.HTTPError as e:
                 if e.response.status_code == 401:
-                    if self._logger: self._logger.debug("SafeStreamWrapper Connection Error http error on %s : %s" % (e.response.url,e.message))
-                    raise AuthenticationError("Error connecting to %s : %s : %s - %s" % (e.response.url,e.message, repr(e.response.headers),repr(e.response.text)))
+                    if self._logger: self._logger.debug("SafeStreamWrapper Connection Error http error on %s : %s" % (e.response.url,e.strerror))
+                    raise AuthenticationError("Error connecting to %s : %s : %s - %s" % (e.response.url,e.strerror, repr(e.response.headers),repr(e.response.text)))
                 if e.response.status_code > 200:
                     self.__process_http_error(e)
                 else:
                     self.__process_tcp_error(e)
             except (ConnectionError, requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.RequestException) as e:
                 self.__process_tcp_error(e)
-
-        
-    
-    
\ No newline at end of file
--- a/script/lib/iri_tweet/iri_tweet/utils.py	Wed Jan 02 17:49:19 2019 +0100
+++ b/script/lib/iri_tweet/iri_tweet/utils.py	Thu Jan 10 18:36:36 2019 +0100
@@ -57,6 +57,46 @@
     get_logger().debug("get_oauth_token : done got %s" % repr(res))
     return res
 
+
+def get_oauth2_token(consumer_key, consumer_secret, token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME):
+    
+    global CACHE_ACCESS_TOKEN
+
+    if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
+        return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
+    
+    res = CACHE_ACCESS_TOKEN.get(application_name, None)
+    
+    if res is None and token_file_path and os.path.exists(token_file_path):
+        get_logger().debug("get_oauth2_token : reading token from file %s" % token_file_path) #@UndefinedVariable
+        res = twitter.oauth2.read_bearer_token_file(token_file_path)
+    
+    if res is not None and check_access_token:
+        get_logger().debug("get_oauth2_token : Check oauth tokens") #@UndefinedVariable
+        t = twitter.Twitter(auth=twitter.OAuth2(consumer_key, consumer_secret, res))
+        status = None
+        try:
+            status = t.application.rate_limit_status()
+        except Exception as e:
+            get_logger().debug("get_oauth2_token : error getting rate limit status %s " % repr(e))            
+            get_logger().debug("get_oauth2_token : error getting rate limit status %s " % str(e))
+            status = None
+        get_logger().debug("get_oauth2_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable
+        if status is None or status.get("resources",{}).get("account",{}).get('/account/verify_credentials',{}).get('remaining',0) == 0:
+            get_logger().debug("get_oauth2_token : Problem with status %s" % repr(status))
+            res = None
+
+    if res is None:
+        get_logger().debug("get_oauth2_token : doing the oauth dance")
+        res = twitter.oauth2_dance(consumer_key, consumer_secret, token_file_path)
+        
+    
+    CACHE_ACCESS_TOKEN[application_name] = res
+    
+    get_logger().debug("get_oauth_token : done got %s" % repr(res))
+    return res
+
+
 def parse_date(date_str):
     ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable
     return datetime.datetime(*ts[0:7])
@@ -116,12 +156,10 @@
         
     def persists(self, session):
         new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
-        new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.iteritems()]) if self.kwargs is not None else {}
+        new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {}
         
-        if self.instance is None:
-            self.instance = self.klass(*new_args, **new_kwargs)
-        else:
-            self.instance = self.klass(*new_args, **new_kwargs)
+        self.instance = self.klass(*new_args, **new_kwargs)
+        if self.instance is not None:
             self.instance = session.merge(self.instance)
 
         session.add(self.instance)
--- a/script/stream/recorder_stream.py	Wed Jan 02 17:49:19 2019 +0100
+++ b/script/stream/recorder_stream.py	Thu Jan 10 18:36:36 2019 +0100
@@ -1,33 +1,35 @@
-from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
-from iri_tweet import models, utils
-from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
-from iri_tweet.processor import get_processor
-from multiprocessing import Queue as mQueue, Process, Event
-from sqlalchemy.exc import OperationalError
-from sqlalchemy.orm import scoped_session
-import Queue
-import StringIO
-import anyjson
 import argparse
 import datetime
 import inspect
-import iri_tweet.stream
+import json
 import logging
 import os
+import queue
 import re
-import requests_oauthlib
 import shutil
 import signal
 import socket
-import sqlalchemy.schema
 import sys
-import thread
 import threading
 import time
 import traceback
-import urllib2
-socket._fileobject.default_bufsize = 0
+import urllib
+from http.server import BaseHTTPRequestHandler, HTTPServer
+from io import StringIO
+from multiprocessing import Event, Process
+from multiprocessing import Queue as mQueue
 
+import requests_oauthlib
+import sqlalchemy.schema
+import twitter
+from sqlalchemy.exc import OperationalError
+from sqlalchemy.orm import scoped_session
+
+import _thread
+import iri_tweet.stream
+from iri_tweet import models, utils
+from iri_tweet.models import ProcessEvent, TweetLog, TweetSource
+from iri_tweet.processor import get_processor
 
 
 # columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
@@ -40,20 +42,17 @@
 
 class Requesthandler(BaseHTTPRequestHandler):
 
-    def __init__(self, request, client_address, server):
-        BaseHTTPRequestHandler.__init__(self, request, client_address, server)
-        
     def do_GET(self):
         self.send_response(200)
         self.end_headers()
-    
+
     def log_message(self, format, *args):        # @ReservedAssignment
         pass
 
 
 def set_logging(options):
     loggers = []
-    
+
     loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
     loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing')))
     if options.debug >= 2:
@@ -68,17 +67,14 @@
     qlogger.propagate = 0
     return qlogger
 
-def get_auth(options, access_token):
-    consumer_key = options.consumer_key
-    consumer_secret = options.consumer_secret
-    auth = requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=access_token[0], resource_owner_secret=access_token[1], signature_type='auth_header')
-    return auth
+def get_auth(consumer_key, consumer_secret, token_key, token_secret):
+    return requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=token_key, resource_owner_secret=token_secret, signature_type='auth_header')
 
 
 def add_process_event(event_type, args, session_maker):
     session = session_maker()
     try:
-        evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=event_type)
+        evt = ProcessEvent(args=None if args is None else json.dumps(args), type=event_type)
         session.add(evt)
         session.commit()
     finally:
@@ -87,15 +83,14 @@
 
 class BaseProcess(Process):
 
-    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
+    def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid):
         self.parent_pid = parent_pid
         self.session_maker = session_maker
         self.queue = queue
         self.options = options
         self.logger_queue = logger_queue
         self.stop_event = stop_event
-        self.consumer_token = (options.consumer_key, options.consumer_secret)
-        self.access_token = access_token
+        self.twitter_auth = twitter_auth
 
         super(BaseProcess, self).__init__()
 
@@ -112,10 +107,10 @@
         else:
             # *ring* Hi mom!
             return True
-    
+
 
     def __get_process_event_args(self):
-        return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
+        return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__}
 
     def run(self):
         try:
@@ -123,47 +118,45 @@
             self.do_run()
         finally:
             add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
-        
+
     def do_run(self):
         raise NotImplementedError()
 
 
 
 class SourceProcess(BaseProcess):
-    
-    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
+
+    def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid):
         self.track = options.track
-        self.token_filename = options.token_filename
         self.timeout = options.timeout
         self.stream = None
-        super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
-                    
+        super(SourceProcess, self).__init__(session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid)
+
     def __source_stream_iter(self):
-                
+
         self.logger.debug("SourceProcess : run ")
-        
-        self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.access_token))
-        self.auth = get_auth(self.options, self.access_token) 
+
+        self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.twitter_auth))
+        self.auth = get_auth(self.twitter_auth.consumer_key, self.twitter_auth.consumer_secret, self.twitter_auth.token, self.twitter_auth.token_secret)
         self.logger.debug("SourceProcess : auth set ")
-        
         track_list = self.track  # or raw_input('Keywords to track (comma seperated): ').strip()
         self.logger.debug("SourceProcess : track list " + track_list)
-        
+
         track_list = [k.strip() for k in track_list.split(',')]
 
-        self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth)))                        
-        self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, chunk_size=512, logger=self.logger)
+        self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth)))
+        self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, logger=self.logger)
         self.logger.debug("SourceProcess : after connecting to stream")
-        self.stream.muststop = lambda: self.stop_event.is_set()        
-        
+        self.stream.muststop = lambda: self.stop_event.is_set()
+
         stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger)
-        
+
         session = self.session_maker()
-        
+
         #import pydevd
         #pydevd.settrace(suspend=False)
 
-        
+
         try:
             for tweet in stream_wrapper:
                 if not self.parent_is_alive():
@@ -184,7 +177,7 @@
                         self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
                         if add_retries == 10:
                             raise
-                     
+
                 source_id = source.id
                 self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
                 self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
@@ -204,13 +197,13 @@
 
 
     def do_run(self):
-        
-        self.logger = set_logging_process(self.options, self.logger_queue)                
-        
+
+        self.logger = set_logging_process(self.options, self.logger_queue)
+
         source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
-        
+
         source_stream_iter_thread.start()
-        
+
         try:
             while not self.stop_event.is_set():
                 self.logger.debug("SourceProcess : In while after start")
@@ -230,11 +223,11 @@
         source_stream_iter_thread.join(30)
 
 
-def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger):
+def process_tweet(tweet, source_id, session, twitter_auth, twitter_query_user, logger):
     try:
         if not tweet.strip():
             return
-        tweet_obj = anyjson.deserialize(tweet)
+        tweet_obj = json.loads(tweet)
         processor_klass = get_processor(tweet_obj)
         if not processor_klass:
             tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
@@ -244,18 +237,16 @@
                                     json_txt=tweet,
                                     source_id=source_id,
                                     session=session,
-                                    consumer_token=consumer_token,
-                                    access_token=access_token,
-                                    token_filename=token_filename,
+                                    twitter_auth=twitter_auth,
                                     user_query_twitter=twitter_query_user,
                                     logger=logger)
-        logger.info(processor.log_info())                        
-        logger.debug(u"Process_tweet :" + repr(tweet))                
+        logger.info(processor.log_info())
+        logger.debug(u"Process_tweet :" + repr(tweet))
         processor.process()
-        
+
     except ValueError as e:
         message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
-        output = StringIO.StringIO()
+        output = StringIO()
         try:
             traceback.print_exc(file=output)
             error_stack = output.getvalue()
@@ -263,11 +254,11 @@
             output.close()
         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
         session.add(tweet_log)
-        session.commit()        
+        session.commit()
     except Exception as e:
         message = u"Error %s processing tweet %s" % (repr(e), tweet)
         logger.exception(message)
-        output = StringIO.StringIO()
+        output = StringIO()
         try:
             traceback.print_exc(file=output)
             error_stack = output.getvalue()
@@ -278,17 +269,17 @@
         session.add(tweet_log)
         session.commit()
 
-    
-        
+
+
 class TweetProcess(BaseProcess):
-    
-    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
-        super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
+
+    def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid):
+        super(TweetProcess, self).__init__(session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid)
         self.twitter_query_user = options.twitter_query_user
 
 
     def do_run(self):
-        
+
         self.logger = set_logging_process(self.options, self.logger_queue)
         session = self.session_maker()
         try:
@@ -299,7 +290,7 @@
                 except Exception as e:
                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
                     continue
-                process_tweet(tweet_txt, source_id, session, self.consumer_token, self.access_token, self.twitter_query_user, self.options.token_filename, self.logger)
+                process_tweet(tweet_txt, source_id, session, self.twitter_auth, self.twitter_query_user, self.logger)
                 session.commit()
         except KeyboardInterrupt:
             self.stop_event.set()
@@ -313,36 +304,36 @@
     Session = scoped_session(Session)
     return Session, engine, metadata
 
-            
-def process_leftovers(session, consumer_token, access_token, twitter_query_user, token_filename, ask_process_leftovers, logger):
-    
+
+def process_leftovers(session, twitter_auth, twitter_query_user, ask_process_leftovers, logger):
+
     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
     sources_count = sources.count()
-    
+
     if sources_count > 10 and ask_process_leftovers:
-        resp = raw_input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count)
+        resp = input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count)
         if resp and resp.strip().lower() == "n":
             return
     logger.info("Process leftovers, %d tweets to process" % (sources_count))
     for src in sources:
         tweet_txt = src.original_json
-        process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger)
+        process_tweet(tweet_txt, src.id, session, twitter_auth, twitter_query_user, logger)
         session.commit()
-        
-    
+
+
 def process_log(logger_queues, stop_event):
     while not stop_event.is_set():
         for lqueue in logger_queues:
             try:
                 record = lqueue.get_nowait()
                 logging.getLogger(record.name).handle(record)
-            except Queue.Empty:
+            except queue.Empty:
                 continue
             except IOError:
                 continue
         time.sleep(0.1)
 
-        
+
 def get_options():
 
     usage = "usage: %(prog)s [options]"
@@ -385,59 +376,59 @@
 def do_run(options, session_maker):
 
     stop_args = {}
-    
-    consumer_token = (options.consumer_key, options.consumer_secret)
-    access_token = utils.get_oauth_token(consumer_key=consumer_token[0], consumer_secret=consumer_token[1], token_file_path=options.token_filename)
-    
-    
+
+
+    access_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename)
+    twitter_auth = twitter.OAuth(access_token_key, access_token_secret, options.consumer_key, options.consumer_secret)
+
     session = session_maker()
     try:
-        process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
+        process_leftovers(session, twitter_auth, options.twitter_query_user, options.ask_process_leftovers, utils.get_logger())
         session.commit()
     finally:
         session.rollback()
         session.close()
-    
+
     if options.process_nb <= 0:
         utils.get_logger().debug("Leftovers processed. Exiting.")
         return None
 
     queue = mQueue()
     stop_event = Event()
-    
+
     # workaround for bug on using urllib2 and multiprocessing
     httpd = HTTPServer(('127.0.0.1',0), Requesthandler)
-    thread.start_new_thread(httpd.handle_request, ())
-    
-    req = urllib2.Request('http://localhost:%d' % httpd.server_port)
+    _thread.start_new_thread(httpd.handle_request, ())
+
+    req = urllib.request.Request('http://localhost:%d' % httpd.server_port)
     conn = None
     try:
-        conn = urllib2.urlopen(req)
+        conn = urllib.request.urlopen(req)
     except:
         utils.get_logger().debug("could not open localhost")
         # donothing
     finally:
         if conn is not None:
             conn.close()
-    
+
     process_engines = []
     logger_queues = []
-    
+
     SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
     process_engines.append(engine_process)
     lqueue = mQueue(50)
     logger_queues.append(lqueue)
     pid = os.getpid()
-    sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
-    
+    sprocess = SourceProcess(SessionProcess, queue, options, twitter_auth, stop_event, lqueue, pid)
+
     tweet_processes = []
-    
+
     for i in range(options.process_nb - 1):
         SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
         process_engines.append(engine_process)
         lqueue = mQueue(50)
         logger_queues.append(lqueue)
-        cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
+        cprocess = TweetProcess(SessionProcess, queue, options, twitter_auth, stop_event, lqueue, pid)
         tweet_processes.append(cprocess)
 
     log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
@@ -452,18 +443,18 @@
     add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
 
     if options.duration >= 0:
-        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
+        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
 
     def interupt_handler(signum, frame):
         utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
         stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
         stop_event.set()
-        
+
     signal.signal(signal.SIGINT , interupt_handler)
     signal.signal(signal.SIGHUP , interupt_handler)
     signal.signal(signal.SIGALRM, interupt_handler)
     signal.signal(signal.SIGTERM, interupt_handler)
-    
+
 
     while not stop_event.is_set():
         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
@@ -484,7 +475,7 @@
         utils.get_logger().debug("Pb joining Source Process - terminating")
     finally:
         sprocess.terminate()
-        
+
     for i, cprocess in enumerate(tweet_processes):
         utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
         try:
@@ -493,7 +484,7 @@
             utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
             cprocess.terminate()
 
-    
+
     utils.get_logger().debug("Close queues")
     try:
         queue.close()
@@ -502,13 +493,13 @@
     except Exception as e:
         utils.get_logger().error("error when closing queues %s", repr(e))
         # do nothing
-        
-    
+
+
     if options.process_nb > 1:
         utils.get_logger().debug("Processing leftovers")
         session = session_maker()
         try:
-            process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
+            process_leftovers(session, twitter_auth, options.twitter_query_user, options.ask_process_leftovers, utils.get_logger())
             session.commit()
         finally:
             session.rollback()
@@ -516,18 +507,18 @@
 
     for pengine in process_engines:
         pengine.dispose()
-    
+
     return stop_args
 
 
 def main(options):
-    
+
     global conn_str
-    
+
     conn_str = options.conn_str.strip()
-    if not re.match("^\w+://.+", conn_str):
+    if not re.match(r"^\w+://.+", conn_str):
         conn_str = 'sqlite:///' + options.conn_str
-        
+
     if conn_str.startswith("sqlite") and options.new:
         filepath = conn_str[conn_str.find(":///") + 4:]
         if os.path.exists(filepath):
@@ -543,7 +534,7 @@
                 shutil.move(filepath, new_path)
 
     Session, engine, metadata = get_sessionmaker(conn_str)
-    
+
     if options.new:
         check_metadata = sqlalchemy.schema.MetaData(bind=engine)
         check_metadata.reflect()
@@ -551,28 +542,28 @@
             message = "Database %s not empty exiting" % conn_str
             utils.get_logger().error(message)
             sys.exit(message)
-    
+
     metadata.create_all(engine)
     session = Session()
     try:
         models.add_model_version(session)
     finally:
         session.close()
-    
+
     stop_args = {}
     try:
         add_process_event(event_type="start", args={'options':options.__dict__, 'args': [], 'command_line': sys.argv}, session_maker=Session)
         stop_args = do_run(options, Session)
     except Exception as e:
-        utils.get_logger().exception("Error in main thread")        
-        outfile = StringIO.StringIO()
+        utils.get_logger().exception("Error in main thread")
+        outfile = StringIO()
         try:
             traceback.print_exc(file=outfile)
             stop_args = {'error': repr(e), 'message': repr(e), 'stacktrace':outfile.getvalue()}
         finally:
             outfile.close()
         raise
-    finally:    
+    finally:
         add_process_event(event_type="shutdown", args=stop_args, session_maker=Session)
 
     utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
@@ -582,22 +573,21 @@
 if __name__ == '__main__':
 
     options = get_options()
-    
+
     loggers = set_logging(options)
-    
+
     utils.get_logger().debug("OPTIONS : " + repr(options))
-    
+
     if options.daemon:
         options.ask_process_leftovers = False
         import daemon
-        
+
         hdlr_preserve = []
         for logger in loggers:
             hdlr_preserve.extend([h.stream for h in logger.handlers])
-            
-        context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) 
+
+        context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve)
         with context:
             main(options)
     else:
         main(options)
-    
--- a/script/utils/merge_tweets.py	Wed Jan 02 17:49:19 2019 +0100
+++ b/script/utils/merge_tweets.py	Thu Jan 10 18:36:36 2019 +0100
@@ -5,6 +5,7 @@
 import logging
 import re
 import sys
+import twitter
 
 from iri_tweet.models import Tweet, TweetLog, TweetSource, setup_database
 from iri_tweet.processor import TwitterProcessorStatus
@@ -13,7 +14,7 @@
 logger = logging.getLogger(__name__)
 
 def get_option():
-    
+
     parser = argparse.ArgumentParser(description='Merge tweets databases')
 
     parser.add_argument("-l", "--log", dest="logfile",
@@ -31,23 +32,24 @@
     parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
                       help="Token file name")
 
-    
+
     parser.add_argument("source", action="store", nargs=1, type=str, metavar="SOURCE")
     parser.add_argument("target", action="store", nargs=1, type=str, metavar="TARGET")
-    
+
 
     return parser.parse_args()
 
 if __name__ == "__main__":
-    
+
     #sys.stdout = codecs.getwriter(sys.stdout.encoding)(sys.stdout)
     writer = None
     options = get_option()
-    
-    access_token = None
+
+    twitter_auth = None
     if options.query_user:
-        access_token = get_oauth_token(options.consumer_key, options.consumer_secret, options.token_filename)
-    
+        acess_token_key, access_token_secret = get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename)
+        twitter_auth = twitter.OAuth(acess_token_key, access_token_secret, options.consumer_key, options.consumer_secret)
+
     #open source
     src_conn_str = options.source[0].strip()
     if not re.match(r"^\w+://.+", src_conn_str):
@@ -58,51 +60,51 @@
 
 
     engine_src, metadata_src, Session_src = setup_database(src_conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)
-    engine_tgt, metadata_tgt, Session_tgt = setup_database(tgt_conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)        
+    engine_tgt, metadata_tgt, Session_tgt = setup_database(tgt_conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)
 
     conn_src = conn_tgt = session_src = session_tgt = None
-    
+
     try:
         #conn_src = engine_src.connect()
         #conn_tgt = engine_tgt.connect()
         session_src = Session_src()
         session_tgt = Session_tgt()
-                
+
         count_tw = session_src.query(Tweet).count()
-        
+
         if count_tw == 0:
             print("No tweet to process : exit")
             sys.exit()
-            
+
         query_src = session_src.query(Tweet).join(TweetSource).yield_per(100)
         added = 0
-        
+
         for i,tweet in enumerate(query_src):
-            
+
             tweet_count = session_tgt.query(Tweet).filter(Tweet.id == tweet.id).count()
-            
+
             progress_text = u"Process: "
             if tweet_count == 0:
                 added += 1
                 progress_text = u"Adding : "
                 tweet_source = tweet.tweet_source.original_json
-                                
+
                 tweet_obj = json.loads(tweet_source)
                 if 'text' not in tweet_obj:
                     tweet_log = TweetLog(tweet_source_id=tweet.tweet_source.id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
                     session_tgt.add(tweet_log)
-                else:                
-                    tp = TwitterProcessorStatus(None, tweet_source, None, session_tgt, access_token, options.token_filename, user_query_twitter=options.query_user, logger=logger)
+                else:
+                    tp = TwitterProcessorStatus(None, tweet_source, None, session_tgt, twitter_auth=twitter_auth, user_query_twitter=options.query_user, logger=logger)
                     tp.process()
-                
+
                 session_tgt.flush()
-                
+
             ptext = progress_text + tweet.text
             writer = show_progress(i+1, count_tw, ptext.replace("\n",""), 70, writer)
-                            
+
         session_tgt.commit()
         print(u"%d new tweet added" % (added,))
-        
+
     finally:
         if session_tgt is not None:
             session_tgt.close()
--- a/script/utils/search_topsy.py	Wed Jan 02 17:49:19 2019 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,181 +0,0 @@
-import argparse
-import logging
-import math
-import re
-import time
-
-from blessings import Terminal
-import requests
-import twitter
-
-from iri_tweet import models, utils
-from iri_tweet.processor import TwitterProcessorStatus
-
-
-logger = logging.getLogger(__name__)
-
-APPLICATION_NAME = "Tweet recorder user"
-
-
-class TopsyResource(object):
-    
-    def __init__(self, query, **kwargs):
-
-        self.options = kwargs
-        self.options['q'] = query
-        self.url = kwargs.get("url", "http://otter.topsy.com/search.json")
-        self.page = 0
-        self.req = None
-        self.res = {}
-        
-    def __initialize(self):
-        
-        params = {}
-        params.update(self.options)
-        self.req = requests.get(self.url, params=params)
-        self.res = self.req.json()
-        
-    def __next_page(self):
-        page = self.res.get("response").get("page") + 1
-        params = {}
-        params.update(self.options)
-        params['page'] = page
-        self.req = requests.get(self.url, params=params)
-        self.res = self.req.json()
-
-    def __iter__(self):        
-        if not self.req:
-            self.__initialize()
-        while "response" in self.res and "list" in self.res.get("response") and self.res.get("response").get("list"):
-            for item in  self.res.get("response").get("list"):
-                yield item
-            self.__next_page()
-            
-    def total(self):
-        if not self.res:
-            return 0
-        else:
-            return self.res.get("response",{}).get("total",0)
-            
-
-
-def get_options():
-    
-    usage = "usage: %(prog)s [options] <connection_str_or_filepath>"
-    
-    parser = argparse.ArgumentParser(usage=usage)
-
-    parser.add_argument(dest="conn_str",
-                        help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR")
-    parser.add_argument("-Q", dest="query",
-                      help="query", metavar="QUERY")
-    parser.add_argument("-k", "--key", dest="consumer_key",
-                        help="Twitter consumer key", metavar="CONSUMER_KEY")
-    parser.add_argument("-s", "--secret", dest="consumer_secret",
-                        help="Twitter consumer secret", metavar="CONSUMER_SECRET")
-    parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
-                      help="Token file name")
-    parser.add_argument("-T", dest="topsy_apikey", metavar="TOPSY_APIKEY", default=None,
-                      help="Topsy apikey")
-
-    utils.set_logging_options(parser)
-
-    return parser.parse_args()
-
-
-
-if __name__ == "__main__":
-
-    options = get_options()
-    
-    utils.set_logging(options);
-
-
-    acess_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename, application_name=APPLICATION_NAME)
-
-    t = twitter.Twitter(domain="api.twitter.com", auth=twitter.OAuth(acess_token_key, access_token_secret, options.consumer_key, options.consumer_secret), secure=True)
-    t.secure = True
-    
-    conn_str = options.conn_str.strip()
-    if not re.match("^\w+://.+", conn_str):
-        conn_str = 'sqlite:///' + conn_str
-    
-    engine, metadata, Session = models.setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all=True)
-    session = None
-    
-    
-    topsy_parameters = {
-        'apikey': options.topsy_apikey,
-        'perpage': 100,
-        'window': 'a',
-        'type': 'tweet',
-        'hidden': True,
-    }
-    
-    term = Terminal()
-    
-    try:
-        session = Session()
-        
-        results = None        
-        page = 1
-        print options.query
-
-        tr = TopsyResource(options.query, **topsy_parameters)
-        
-        move_up = 0
-        
-        for i,item in enumerate(tr):
-            # get id
-            url = item.get("url")
-            tweet_id = url.split("/")[-1]
-            
-            if move_up > 0:
-                print((move_up+1)*term.move_up())
-                move_up = 0
-            
-            print ("%d/%d:%03d%% - %s - %r" % (i+1, tr.total(), int(float(i+1)/float(tr.total())*100.0), tweet_id, item.get("content") ) + term.clear_eol())            
-            move_up += 1
-            
-            count_tweet = session.query(models.Tweet).filter_by(id_str=tweet_id).count()
-            
-            if count_tweet:
-                continue
-            try:                                    
-                tweet = t.statuses.show(id=tweet_id, include_entities=True)
-            except twitter.api.TwitterHTTPError as e:
-                if e.e.code == 404 or e.e.code == 403:
-                    continue
-                else:
-                    raise
-            
-            processor = TwitterProcessorStatus(tweet, None, None, session, None, options.token_filename, logger)
-            processor.process()
-            session.flush()
-            session.commit()
-
-            print("rate limit remaining %s of %s" % (str(tweet.rate_limit_remaining), str(tweet.headers.getheader('X-Rate-Limit-Limit'))) + term.clear_eol())
-            move_up += 1
-            rate_limit_limit = int(tweet.headers.getheader('X-Rate-Limit-Limit'))
-            rate_limit_remaining = int(tweet.rate_limit_remaining)
-
-            if rate_limit_remaining < rate_limit_limit:
-                time_to_sleep = 0
-            else:
-                time_to_sleep = int(math.ceil((tweet.rate_limit_reset - time.mktime(time.gmtime())) / tweet.rate_limit_remaining)) 
-
-            for i in xrange(time_to_sleep):
-                if i:
-                    print(2*term.move_up())
-                else:
-                    move_up += 1
-                print(("Sleeping for %d seconds, %d remaining" % (time_to_sleep, time_to_sleep-i)) + term.clear_eol())
-                time.sleep(1)
-                
-    except twitter.api.TwitterHTTPError as e:
-        fmt = ("." + e.format) if e.format else ""
-        print "Twitter sent status %s for URL: %s%s using parameters: (%s)\ndetails: %s" % (repr(e.e.code), repr(e.uri), repr(fmt), repr(e.uriparts), repr(e.response_data))
-        
-    finally:
-        if session:
-            session.close()
--- a/script/utils/search_topsy_scrap.py	Wed Jan 02 17:49:19 2019 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,211 +0,0 @@
-import argparse
-import logging
-import math
-import re
-import time
-import urllib
-
-from blessings import Terminal
-import requests
-import twitter
-
-from iri_tweet import models, utils
-from iri_tweet.processor import TwitterProcessorStatus
-
-from selenium import webdriver
-from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
-from selenium.webdriver.common.by import By
-from selenium.webdriver.support.ui import WebDriverWait
-from selenium.webdriver.support import expected_conditions as EC
-
-from lxml import html
-import json
-
-logger = logging.getLogger(__name__)
-
-APPLICATION_NAME = "Tweet recorder user"
-
-dcap = dict(DesiredCapabilities.PHANTOMJS)
-dcap["phantomjs.page.settings.userAgent"] = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.103 Safari/537.36"
-
-class TopsyResource(object):
-
-    def __init__(self, query, **kwargs):
-
-        self.options = {}
-        self.options['q'] = query
-        self.options.update(kwargs)
-        self.base_url = "http://topsy.com/s"
-        self.driver = webdriver.PhantomJS(desired_capabilities=dcap)
-        self.driver.set_window_size(1024, 768)
-        self.page = -1
-        self.tree = None
-
-
-    def __do_request(self, params):
-      url = "%s?%s" % (self.base_url, urllib.urlencode(params).replace('+','%20')) #calculate url with urllib
-      print('Requesting %s' % url)
-      self.driver.get(url)
-      try:
-          element = WebDriverWait(self.driver, 60).until(
-              EC.presence_of_element_located((By.CLASS_NAME, "result-tweet"))
-          )
-      except Exception as e:
-        print('Exception requesting %s : %s' % (url, e))
-        self.tree = None
-      else:
-        self.tree = html.fromstring(self.driver.page_source)
-
-    def __check_last(self):
-      if self.page < 0:
-          return False
-      if self.tree is None or len(self.tree.xpath("//*[@id=\"module-pager\"]/div/ul/li[@data-page=\"next\"and @class=\"disabled\"]")):
-          return True
-      else:
-          return False
-
-
-    def __next_page(self):
-        if self.__check_last():
-          return False
-        self.page += 1
-        params = {}
-        params.update(self.options)
-        if self.page:
-          params['offset'] = self.page*self.options.get('perpage',10)
-        self.__do_request(params)
-        return self.tree is not None
-
-    def __iter__(self):
-        result_xpath = "//*[@id=\"results\"]/div"
-        while self.__next_page():
-            for res_node in self.tree.xpath(result_xpath):
-                res_obj = {
-                  'user': "".join(res_node.xpath("./div/div/h5/a/text()")),
-                  'content': "".join(res_node.xpath("./div/div/div/text()")),
-                  'url': "".join(res_node.xpath("./div/div/ul/li[1]/small/a/@href"))
-                }
-                if res_obj['url']:
-                  yield res_obj
-
-
-def get_options():
-
-    usage = "usage: %(prog)s [options] <connection_str_or_filepath>"
-
-    parser = argparse.ArgumentParser(usage=usage)
-
-    parser.add_argument(dest="conn_str",
-                        help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR")
-    parser.add_argument("-Q", dest="query",
-                      help="query", metavar="QUERY")
-    parser.add_argument("-k", "--key", dest="consumer_key",
-                        help="Twitter consumer key", metavar="CONSUMER_KEY")
-    parser.add_argument("-s", "--secret", dest="consumer_secret",
-                        help="Twitter consumer secret", metavar="CONSUMER_SECRET")
-    parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
-                      help="Token file name")
-    parser.add_argument("-T", dest="topsy_apikey", metavar="TOPSY_APIKEY", default=None,
-                      help="Topsy apikey")
-
-    utils.set_logging_options(parser)
-
-    return parser.parse_args()
-
-
-
-if __name__ == "__main__":
-
-    options = get_options()
-
-    utils.set_logging(options);
-
-
-    acess_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename, application_name=APPLICATION_NAME)
-
-    t = twitter.Twitter(domain="api.twitter.com", auth=twitter.OAuth(acess_token_key, access_token_secret, options.consumer_key, options.consumer_secret), secure=True)
-    t.secure = True
-
-    conn_str = options.conn_str.strip()
-    if not re.match("^\w+://.+", conn_str):
-        conn_str = 'sqlite:///' + conn_str
-
-    engine, metadata, Session = models.setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all=True)
-    session = None
-
-
-    topsy_parameters = {
-        'perpage': 10,
-        'window': 'a',
-        'type': 'tweet',
-        'hidden': 1,
-        'sort': 'date'
-    }
-
-    term = Terminal()
-
-    try:
-        session = Session()
-
-        results = None
-        page = 1
-        print options.query
-
-        tr = TopsyResource(options.query, **topsy_parameters)
-
-        move_up = 0
-
-        for i,item in enumerate(tr):
-            # get id
-            url = item.get("url")
-            tweet_id = url.split("/")[-1]
-
-            if move_up > 0:
-                print((move_up+1)*term.move_up())
-                move_up = 0
-
-            print ("%d: %s - %r" % (i+1, tweet_id, item.get("content") ) + term.clear_eol())
-            move_up += 1
-
-            count_tweet = session.query(models.Tweet).filter_by(id_str=tweet_id).count()
-
-            if count_tweet:
-                continue
-            try:
-                tweet = t.statuses.show(id=tweet_id, include_entities=True)
-            except twitter.api.TwitterHTTPError as e:
-                if e.e.code == 404 or e.e.code == 403:
-                    continue
-                else:
-                    raise
-
-            processor = TwitterProcessorStatus(tweet, None, None, session, None, options.token_filename, logger)
-            processor.process()
-            session.flush()
-            session.commit()
-
-            print("rate limit remaining %s of %s" % (str(tweet.rate_limit_remaining), str(tweet.headers.getheader('X-Rate-Limit-Limit'))) + term.clear_eol())
-            move_up += 1
-            rate_limit_limit = int(tweet.headers.getheader('X-Rate-Limit-Limit'))
-            rate_limit_remaining = int(tweet.rate_limit_remaining)
-
-            if rate_limit_remaining < rate_limit_limit:
-                time_to_sleep = 0
-            else:
-                time_to_sleep = int(math.ceil((tweet.rate_limit_reset - time.mktime(time.gmtime())) / tweet.rate_limit_remaining))
-
-            for i in xrange(time_to_sleep):
-                if i:
-                    print(2*term.move_up())
-                else:
-                    move_up += 1
-                print(("Sleeping for %d seconds, %d remaining" % (time_to_sleep, time_to_sleep-i)) + term.clear_eol())
-                time.sleep(1)
-
-    except twitter.api.TwitterHTTPError as e:
-        fmt = ("." + e.format) if e.format else ""
-        print "Twitter sent status %s for URL: %s%s using parameters: (%s)\ndetails: %s" % (repr(e.e.code), repr(e.uri), repr(fmt), repr(e.uriparts), repr(e.response_data))
-
-    finally:
-        if session:
-            session.close()
--- a/script/utils/search_twitter_api.py	Wed Jan 02 17:49:19 2019 +0100
+++ b/script/utils/search_twitter_api.py	Thu Jan 10 18:36:36 2019 +0100
@@ -1,47 +1,91 @@
 import argparse
+import datetime
+import functools
+import json
 import logging
 import math
 import re
 import time
-import datetime
 import urllib
+from enum import Enum
 
-from blessings import Terminal
 import requests
 import twitter
+from blessings import Terminal
 
 from iri_tweet import models, utils
 from iri_tweet.processor import TwitterProcessorStatus
 
-import json
-
 logger = logging.getLogger(__name__)
 
 APPLICATION_NAME = "Tweet seach json"
 
 
+class SearchType(Enum):
+    standard = 'standard'
+    _30day = '30day'
+    full = 'full'
+
+    def __str__(self):
+        return self.value
+
+def pass_kwargs_as_json(f):
+    def kwargs_json_wrapper(*args, **kwargs):
+        normal_kwargs = { k:v for k,v in kwargs.items() if k[0] != "_" }
+        special_kwargs = { k:v for k,v in kwargs.items() if k[0] == "_" }
+        new_kwargs = { **special_kwargs, '_json': normal_kwargs }
+        return f(*args, **new_kwargs)
+    return kwargs_json_wrapper
+
 # TODO: implement some more parameters
 # script to "scrap twitter results"
 # Shamelessly taken from https://github.com/Jefferson-Henrique/GetOldTweets-python
 # pyquery cssselect
 class TweetManager:
 
-    def __init__(self, query, twitter_con):
+    def __init__(self, twitter_con, query, search_type, api_env):
         self.query = query
-        self.max_id = 0
+        self.search_type = search_type
+        self.next = ""
         self.t = twitter_con
-        pass
+        self.api_env = api_env
+        self.twitter_api = self.get_twitter_api()
+        self.rate_limit_remaining = 0
+        self.rate_limit_limit = 0
+        self.rate_limit_reset = 0
+        self.i = 0
+
+    def get_twitter_api(self):
+        return {
+            SearchType.standard: lambda t: t.search.tweets,
+            SearchType._30day:   lambda t: pass_kwargs_as_json(functools.partial(getattr(getattr(t.tweets.search,'30day'),self.api_env), _method="POST")),
+            SearchType.full:     lambda t: pass_kwargs_as_json(functools.partial(getattr(t.tweets.search.fullarchive, self.api_env), _method="POST")),
+        }[self.search_type](self.t)
 
     def __iter__(self):
         while True:
-            if self.max_id < 0:
+            if self.next is None:
                 break
-            json = self.get_json_response()
+            self.i = self.i+1
+
+            # with open("json_dump_%s.json" % self.i, 'r') as fp:
+            #     jsondata = json.load(fp)
+            jsondata = self.get_json_response()
 
-            next_results = json['search_metadata'].get('next_results', "?")[1:]
-            self.max_id = int(urllib.parse.parse_qs(next_results).get('max_id', [-1])[0])
+            self.rate_limit_remaining = jsondata.rate_limit_remaining
+            self.rate_limit_limit = jsondata.rate_limit_limit
+            self.rate_limit_reset = jsondata.rate_limit_reset
+
+            with open("json_dump_%s.json" % self.i, 'w') as fp:
+                json.dump(jsondata, fp)
 
-            tweet_list = json['statuses']
+            if self.search_type == SearchType.standard:
+                next_results = jsondata['search_metadata'].get('next_results', "?")[1:]
+                self.next = urllib.parse.parse_qs(next_results).get('max_id', [None])[0]
+                tweet_list = jsondata['statuses']
+            else:
+                self.next = jsondata.get('next')
+                tweet_list = jsondata['results']
 
             if len(tweet_list) == 0:
                 break
@@ -50,8 +94,13 @@
                 yield tweet
 
     def get_json_response(self):
-        return self.t.search.tweets(q=self.query, include_entities=True, max_id=self.max_id)
-
+        if self.search_type == SearchType.standard:
+            return self.twitter_api(q=self.query, include_entities=True, max_id=int(self.next) if self.next else 0)
+        else:
+            kwargs = { "query": self.query, "maxResults": 100 }
+            if self.next:
+                kwargs["next"] = self.next
+            return self.twitter_api(**kwargs)
 
 def get_options():
 
@@ -62,31 +111,37 @@
     parser.add_argument(dest="conn_str",
                         help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR")
     parser.add_argument("-Q", dest="query",
-                      help="query", metavar="QUERY")
+                        help="query", metavar="QUERY")
     parser.add_argument("-k", "--key", dest="consumer_key",
                         help="Twitter consumer key", metavar="CONSUMER_KEY")
     parser.add_argument("-s", "--secret", dest="consumer_secret",
                         help="Twitter consumer secret", metavar="CONSUMER_SECRET")
     parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
-                      help="Token file name")
+                        help="Token file name")
+    parser.add_argument("-a", dest="search_type", metavar="SEARCH_TYPE", default=SearchType.standard, choices=list(SearchType), type=SearchType,
+                        help="Twitter search type ('standard', '30days', 'full')")
+    parser.add_argument("-e", dest="api_env", metavar="API_ENV", default="dev",
+                        help="Twitter api dev environment")
+
 
     utils.set_logging_options(parser)
 
     return parser.parse_args()
 
 
-
 if __name__ == "__main__":
 
     options = get_options()
 
+    print("the search type is : %s" % options.search_type)
+
     utils.set_logging(options)
 
-
-    acess_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename, application_name=APPLICATION_NAME)
+    bearer_token = utils.get_oauth2_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename, application_name=APPLICATION_NAME)
+    twitter_auth = twitter.OAuth2(options.consumer_key, options.consumer_secret, bearer_token)
 
-    t = twitter.Twitter(domain="api.twitter.com", auth=twitter.OAuth(acess_token_key, access_token_secret, options.consumer_key, options.consumer_secret), secure=True)
-    t.secure = True    
+    t = twitter.Twitter(domain="api.twitter.com", auth=twitter_auth, secure=True)
+    t.secure = True
 
     conn_str = options.conn_str.strip()
     if not re.match(r"^\w+://.+", conn_str):
@@ -104,7 +159,7 @@
         results = None
         print(options.query)
 
-        tm = TweetManager(options.query, t)
+        tm = TweetManager(t, options.query, options.search_type, options.api_env)
 
         move_up = 0
 
@@ -127,7 +182,7 @@
             if count_tweet:
                 continue
 
-            processor = TwitterProcessorStatus(tweet, None, None, session, None, options.token_filename, logger)
+            processor = TwitterProcessorStatus(tweet, None, None, session, twitter_auth=twitter_auth, logger=logger)
             processor.process()
             session.flush()
             session.commit()
--- a/script/utils/search_twitter_json.py	Wed Jan 02 17:49:19 2019 +0100
+++ b/script/utils/search_twitter_json.py	Thu Jan 10 18:36:36 2019 +0100
@@ -130,8 +130,9 @@
 
 
     acess_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename, application_name=APPLICATION_NAME)
+    twitter_auth = twitter.OAuth(acess_token_key, access_token_secret, options.consumer_key, options.consumer_secret)
 
-    t = twitter.Twitter(domain="api.twitter.com", auth=twitter.OAuth(acess_token_key, access_token_secret, options.consumer_key, options.consumer_secret), secure=True)
+    t = twitter.Twitter(domain="api.twitter.com", auth=twitter_auth, secure=True)
     t.secure = True
 
     conn_str = options.conn_str.strip()
@@ -180,7 +181,7 @@
                 else:
                     raise
 
-            processor = TwitterProcessorStatus(tweet, None, None, session, None, options.token_filename, logger)
+            processor = TwitterProcessorStatus(tweet, None, None, session, twitter_auth=twitter_auth, logger=logger)
             processor.process()
             session.flush()
             session.commit()
--- a/script/virtualenv/script/res/requirement.txt	Wed Jan 02 17:49:19 2019 +0100
+++ b/script/virtualenv/script/res/requirement.txt	Thu Jan 10 18:36:36 2019 +0100
@@ -1,23 +1,26 @@
-anyjson==0.3.3
-blessings==1.6
-cssselect==0.9.1
-docutils==0.12
-httplib2==0.9.2
+astroid==2.1.0
+blessings==1.7
+certifi==2018.11.29
+chardet==3.0.4
+cssselect==1.0.3
+docutils==0.14
+idna==2.8
 iri-tweet===0.82.0final0
+isort==4.3.4
+lazy-object-proxy==1.3.1
 lockfile==0.12.2
-lxml==3.5.0
-oauth2==1.9.0.post1
-oauthlib==1.0.3
-psycopg2==2.6.1
-pyquery==1.2.11
-python-daemon==2.1.1
-python-dateutil==2.5.0
-pytz==2016.1
-requests==2.9.1
-requests-oauthlib==0.6.1
-selenium==2.53.1
-simplejson==3.8.2
-six==1.10.0
-SQLAlchemy==1.0.12
-twitter==1.17.1
-twitter-text-py==2.0.2
+lxml==4.2.5
+mccabe==0.6.1
+oauthlib==2.1.0
+pylint==2.2.2
+pyquery==1.4.0
+python-daemon==2.2.0
+python-dateutil==2.7.5
+requests==2.21.0
+requests-oauthlib==1.1.0
+six==1.12.0
+SQLAlchemy==1.2.15
+twitter==1.18.0
+twitter-text==3.0
+urllib3==1.24.1
+wrapt==1.10.11
Binary file web/images/polemictweet_square.png has changed