adapt polemictweet to python 3.7, new twitter model and update requirement
authorymh <ymh.work@gmail.com>
Tue, 02 Jul 2019 17:41:28 +0200
changeset 1507 1e7aa7dc444b
parent 1506 2b69678563e8
child 1508 92acbeb07057
adapt polemictweet to python 3.7, new twitter model and update requirement
.hgignore
script/.envrc
script/.vscode/settings.json
script/lib/iri_tweet/iri_tweet/__init__.py
script/lib/iri_tweet/iri_tweet/models.py
script/lib/iri_tweet/iri_tweet/utils.py
script/virtualenv/script/res/requirement.txt
--- a/.hgignore	Mon Jul 01 14:35:52 2019 +0200
+++ b/.hgignore	Tue Jul 02 17:41:28 2019 +0200
@@ -37,3 +37,4 @@
 ^sbin/sync/sync_live
 ^web/vendor
 ^web/devroot$
+^script/utils/.*\.json$
--- a/script/.envrc	Mon Jul 01 14:35:52 2019 +0200
+++ b/script/.envrc	Tue Jul 02 17:41:28 2019 +0200
@@ -1,1 +1,2 @@
-use pythonvenv 3.7.1+brew
+use pythonvenv 3.7.3+brew
+export PYTHONPATH="/Users/ymh/dev/projects/tweet_live/script/lib/iri_tweet"
--- a/script/.vscode/settings.json	Mon Jul 01 14:35:52 2019 +0200
+++ b/script/.vscode/settings.json	Tue Jul 02 17:41:28 2019 +0200
@@ -1,4 +1,4 @@
 {
-    "python.pythonPath": "/Users/ymh/dev/projects/tweet_live/script/.direnv/python-3.7.1/bin/python",
+    "python.pythonPath": ".direnv/python-3.7.3/bin/python",
     "python.analysis.diagnosticPublishDelay": 996
 }
\ No newline at end of file
--- a/script/lib/iri_tweet/iri_tweet/__init__.py	Mon Jul 01 14:35:52 2019 +0200
+++ b/script/lib/iri_tweet/iri_tweet/__init__.py	Tue Jul 02 17:41:28 2019 +0200
@@ -2,7 +2,7 @@
 
 VERSION = (0, 82, 0, "final", 0)
 
-VERSION_STR = unicode(".".join(map(lambda i:"%02d" % (i,), VERSION[:2])))
+VERSION_STR = ".".join(map(lambda i:"%02d" % (i,), VERSION[:2]))
 
 
 def get_version():
--- a/script/lib/iri_tweet/iri_tweet/models.py	Mon Jul 01 14:35:52 2019 +0200
+++ b/script/lib/iri_tweet/iri_tweet/models.py	Tue Jul 02 17:41:28 2019 +0200
@@ -12,7 +12,7 @@
 
 Base = declarative_base()
 
-APPLICATION_NAME = "IRI_TWITTER" 
+APPLICATION_NAME = "IRI_TWITTER"
 ACCESS_TOKEN_KEY = None
 ACCESS_TOKEN_SECRET = None
 
@@ -27,7 +27,7 @@
         return json.dumps(obj)
 
 class TweetMeta(type(Base)):
-            
+
     def __init__(cls, name, bases, ns): #@NoSelf
         def init(self, **kwargs):
             for key, value in kwargs.items():
@@ -36,7 +36,7 @@
             super(cls, self).__init__()
         setattr(cls, '__init__', init)
         super(TweetMeta, cls).__init__(name, bases, ns)
-    
+
 
 class ProcessEvent(Base):
     __metaclass__ = TweetMeta
@@ -45,7 +45,7 @@
     ts = Column(DateTime, default=datetime.datetime.utcnow, index=True)
     type = Column(Enum("start","pid","shutdown","error", "start_worker", "stop_worker", "model_version", "application_name", "application_version", name="process_event_type_enum"), nullable=False)
     args = Column(String)
-    
+
 class EntityType(Base):
     __metaclass__ = TweetMeta
     __tablename__ = "tweet_entity_type"
@@ -75,7 +75,7 @@
 
 
 class TweetLog(Base):
-        
+
     TWEET_STATUS = {
         'OK' : 1,
         'ERROR' : 2,
@@ -90,7 +90,7 @@
         'DELETE_PENDING': 4
     }
     __metaclass__ = TweetMeta
-    
+
     __tablename__ = 'tweet_tweet_log'
     id = Column(Integer, primary_key=True, autoincrement=True)
     ts = Column(DateTime, default=datetime.datetime.utcnow, index=True)
@@ -100,8 +100,8 @@
     status = Column(Integer)
     error = Column(String)
     error_stack = Column(String)
- 
-    
+
+
 class Tweet(Base):
     __metaclass__ = TweetMeta
     __tablename__ = 'tweet_tweet'
@@ -109,12 +109,12 @@
     id = Column(BigInteger, primary_key=True, autoincrement=False)
     id_str = Column(String)
     contributors = Column(String)
-    coordinates = Column(String) 
+    coordinates = Column(String)
     created_at = Column(DateTime, index=True)
     favorited = Column(Boolean)
     geo = Column(String)
     in_reply_to_screen_name = Column(String)
-    in_reply_to_status_id = Column(BigInteger) 
+    in_reply_to_status_id = Column(BigInteger)
     in_reply_to_status_id_str = Column(String)
     in_reply_to_user_id = Column(BigInteger)
     in_reply_to_user_id_str = Column(String)
@@ -130,7 +130,7 @@
     tweet_source = relationship("TweetSource", backref="tweet")
     entity_list = relationship(Entity, backref='tweet', cascade="all, delete-orphan")
     received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True)
-        
+
 
 class UserMessage(Base):
     __metaclass__ = TweetMeta
@@ -145,54 +145,41 @@
 class Message(Base):
     __metaclass__ = TweetMeta
     __tablename__ = "tweet_message"
-    
+
     id = Column(Integer, primary_key=True)
     created_at = Column(DateTime, default=datetime.datetime.utcnow)
     text = Column(String)
     users = relationship(UserMessage, backref='message')
-        
+
 
 class User(Base):
     __metaclass__ = TweetMeta
     __tablename__ = "tweet_user"
-    
+
     id = Column(BigInteger, primary_key=True, autoincrement=False)
     id_str = Column(String)
-    contributors_enabled = Column(Boolean)
     created_at = Column(DateTime, index=True)
     description = Column(String)
     favourites_count = Column(Integer)
-    follow_request_sent = Column(Boolean)
     followers_count = Column(Integer)
-    following = Column(String)
     friends_count = Column(Integer)
-    geo_enabled = Column(Boolean)
-    is_translator = Column(Boolean)
-    lang = Column(String)
     listed_count = Column(Integer)
     location = Column(String)
     name = Column(String)
-    notifications = Column(String)
-    profile_background_color = Column(String)
-    profile_background_image_url = Column(String)
-    profile_background_tile = Column(Boolean)
-    profile_image_url = Column(String)
-    profile_image_url_https = Column(String)
-    profile_link_color = Column(String)
-    profile_sidebar_border_color = Column(String)
-    profile_sidebar_fill_color = Column(String)
-    profile_text_color = Column(String)
     default_profile_image = Column(String)
-    profile_use_background_image = Column(Boolean)
+    default_profile = Column(Boolean)
     protected = Column(Boolean)
     screen_name = Column(String, index=True)
-    show_all_inline_media = Column(Boolean)
     statuses_count = Column(Integer)
-    time_zone = Column(String)
     url = Column(String)
-    utc_offset = Column(Integer)
     verified = Column(Boolean)
-    
+    derived = Column(String) #JSON
+    profile_banner_url = Column(String)
+    profile_image_url_https = Column(String)
+    withheld_in_countries = Column(String) # ARRAY
+    withheld_scope = Column(String)
+
+
 
 class Hashtag(Base):
     __metaclass__ = TweetMeta
@@ -215,7 +202,7 @@
     id = Column(Integer, primary_key=True, autoincrement=True)
     label = Column(String, unique=True, index=True)
 
-    
+
 
 class Media(Base):
     __metaclass__ = TweetMeta
@@ -231,7 +218,7 @@
     type_id = Column(Integer, ForeignKey("tweet_media_type.id"))
     type = relationship(MediaType, primaryjoin=type_id == MediaType.id)
 
-    
+
 
 class EntityHashtag(Entity):
     __tablename__ = "tweet_entity_hashtag"
@@ -240,7 +227,7 @@
     hashtag_id = Column(Integer, ForeignKey("tweet_hashtag.id"))
     hashtag = relationship(Hashtag, primaryjoin=hashtag_id == Hashtag.id)
 
-    
+
 class EntityUrl(Entity):
     __tablename__ = "tweet_entity_url"
     __mapper_args__ = {'polymorphic_identity': 'entity_url'}
@@ -255,7 +242,7 @@
     user_id = Column(BigInteger, ForeignKey('tweet_user.id'))
     user = relationship(User, primaryjoin=(user_id == User.id))
 
-                
+
 class EntityMedia(Entity):
     __tablename__ = "tweet_entity_media"
     __mapper_args__ = {'polymorphic_identity': 'entity_media'}
@@ -268,31 +255,31 @@
     session.add(pe)
     if must_commit:
         session.commit()
-                
+
 def setup_database(*args, **kwargs):
-    
+
     session_argname = [ 'autoflush','binds', "class_", "_enable_transaction_accounting","expire_on_commit", "extension", "query_cls", "twophase", "weak_identity_map", "autocommit"]
-    
+
     kwargs_ce = dict((k, v) for k,v in kwargs.items() if (k not in session_argname and k != "create_all"))
 
     engine = create_engine(*args, **kwargs_ce)
-    
+
     if engine.name == "sqlite":
         @event.listens_for(Engine, "connect")
         def set_sqlite_pragma(dbapi_connection, connection_record): #pylint: W0612
             cursor = dbapi_connection.cursor()
             cursor.execute("PRAGMA foreign_keys=ON")
             cursor.close()
-    
-    metadata = Base.metadata        
-                
+
+    metadata = Base.metadata
+
     kwargs_sm = {'bind': engine}
-    
+
     kwargs_sm.update([(argname, kwargs[argname]) for argname in session_argname if argname in kwargs])
 
     Session = sessionmaker(**kwargs_sm)
     #set model version
-    
+
     if kwargs.get('create_all', True):
         metadata.create_all(engine)
         session = Session()
--- a/script/lib/iri_tweet/iri_tweet/utils.py	Mon Jul 01 14:35:52 2019 +0200
+++ b/script/lib/iri_tweet/iri_tweet/utils.py	Tue Jul 02 17:41:28 2019 +0200
@@ -5,13 +5,14 @@
 import logging
 import math
 import os.path
-import Queue
+import queue
 import socket
 import sys
 
 import twitter.oauth
 import twitter.oauth_dance
 from sqlalchemy.sql import or_, select
+from sqlalchemy.orm import class_mapper
 
 from .models import (ACCESS_TOKEN_KEY, ACCESS_TOKEN_SECRET, APPLICATION_NAME,
                      EntityHashtag, Hashtag, Tweet, User, adapt_date,
@@ -20,18 +21,18 @@
 CACHE_ACCESS_TOKEN = {}
 
 def get_oauth_token(consumer_key, consumer_secret, token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME):
-    
+
     global CACHE_ACCESS_TOKEN
 
     if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
         return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
-    
+
     res = CACHE_ACCESS_TOKEN.get(application_name, None)
-    
+
     if res is None and token_file_path and os.path.exists(token_file_path):
         get_logger().debug("get_oauth_token : reading token from file %s" % token_file_path) #@UndefinedVariable
         res = twitter.oauth.read_token_file(token_file_path)
-    
+
     if res is not None and check_access_token:
         get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable
         t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], consumer_key, consumer_secret))
@@ -39,7 +40,7 @@
         try:
             status = t.application.rate_limit_status(resources="account")
         except Exception as e:
-            get_logger().debug("get_oauth_token : error getting rate limit status %s " % repr(e))            
+            get_logger().debug("get_oauth_token : error getting rate limit status %s " % repr(e))
             get_logger().debug("get_oauth_token : error getting rate limit status %s " % str(e))
             status = None
         get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable
@@ -50,27 +51,27 @@
     if res is None:
         get_logger().debug("get_oauth_token : doing the oauth dance")
         res = twitter.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
-        
-    
+
+
     CACHE_ACCESS_TOKEN[application_name] = res
-    
+
     get_logger().debug("get_oauth_token : done got %s" % repr(res))
     return res
 
 
 def get_oauth2_token(consumer_key, consumer_secret, token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME):
-    
+
     global CACHE_ACCESS_TOKEN
 
     if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
         return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
-    
+
     res = CACHE_ACCESS_TOKEN.get(application_name, None)
-    
+
     if res is None and token_file_path and os.path.exists(token_file_path):
         get_logger().debug("get_oauth2_token : reading token from file %s" % token_file_path) #@UndefinedVariable
         res = twitter.oauth2.read_bearer_token_file(token_file_path)
-    
+
     if res is not None and check_access_token:
         get_logger().debug("get_oauth2_token : Check oauth tokens") #@UndefinedVariable
         t = twitter.Twitter(auth=twitter.OAuth2(consumer_key, consumer_secret, res))
@@ -78,7 +79,7 @@
         try:
             status = t.application.rate_limit_status()
         except Exception as e:
-            get_logger().debug("get_oauth2_token : error getting rate limit status %s " % repr(e))            
+            get_logger().debug("get_oauth2_token : error getting rate limit status %s " % repr(e))
             get_logger().debug("get_oauth2_token : error getting rate limit status %s " % str(e))
             status = None
         get_logger().debug("get_oauth2_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable
@@ -89,10 +90,10 @@
     if res is None:
         get_logger().debug("get_oauth2_token : doing the oauth dance")
         res = twitter.oauth2_dance(consumer_key, consumer_secret, token_file_path)
-        
-    
+
+
     CACHE_ACCESS_TOKEN[application_name] = res
-    
+
     get_logger().debug("get_oauth_token : done got %s" % repr(res))
     return res
 
@@ -102,7 +103,7 @@
     return datetime.datetime(*ts[0:7])
 
 def clean_keys(dict_val):
-    return dict([(str(key),value) for key,value in dict_val.iteritems()])
+    return dict([(str(key),value) for key,value in dict_val.items()])
 
 fields_adapter = {
     'stream': {
@@ -115,22 +116,29 @@
         },
         "user": {
             "created_at"  : adapt_date,
+            "derived" : adapt_json,
+            "withheld_in_countries" : adapt_json
         },
 
     },
-                  
+
     'entities' : {
         "medias": {
             "sizes"  : adapt_json,
-        },                  
+        },
     },
     'rest': {
+        "user": {
+            "created_at"  : adapt_date,
+            "derived" : adapt_json,
+            "withheld_in_countries" : adapt_json
+        },
         "tweet" : {
             "place"         : adapt_json,
             "geo"           : adapt_json,
             "created_at"    : adapt_date,
 #            "original_json" : adapt_json,
-        }, 
+        },
     },
 }
 
@@ -143,41 +151,44 @@
             return adapter_mapping[field](value)
         else:
             return value
-    return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.iteritems()])    
+    return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])
 
 
 class ObjectBufferProxy(object):
     def __init__(self, klass, args, kwargs, must_flush, instance=None):
         self.klass= klass
+        self.mapper = class_mapper(klass)
         self.args = args
         self.kwargs = kwargs
         self.must_flush = must_flush
         self.instance = instance
-        
+
     def persists(self, session):
         new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
         new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {}
-        
-        self.instance = self.klass(*new_args, **new_kwargs)
+
+        self.instance = self.klass(*new_args, **{
+            k: v for k, v in new_kwargs.items() if k in self.mapper.attrs.keys()
+        })
         if self.instance is not None:
             self.instance = session.merge(self.instance)
 
         session.add(self.instance)
         if self.must_flush:
             session.flush()
-            
+
     def __getattr__(self, name):
         return lambda : getattr(self.instance, name) if self.instance else None
-        
-        
-    
+
+
+
 
 class ObjectsBuffer(object):
 
     def __init__(self):
         self.__bufferlist = []
         self.__bufferdict = {}
-    
+
     def __add_proxy_object(self, proxy):
         proxy_list =  self.__bufferdict.get(proxy.klass, None)
         if proxy_list is None:
@@ -185,16 +196,16 @@
             self.__bufferdict[proxy.klass] = proxy_list
         proxy_list.append(proxy)
         self.__bufferlist.append(proxy)
-        
+
     def persists(self, session):
         for object_proxy in self.__bufferlist:
             object_proxy.persists(session)
-                
+
     def add_object(self, klass, args, kwargs, must_flush, instance=None):
         new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush, instance)
         self.__add_proxy_object(new_proxy)
-        return new_proxy 
-    
+        return new_proxy
+
     def get(self, klass, **kwargs):
         if klass in self.__bufferdict:
             for proxy in self.__bufferdict[klass]:
@@ -208,27 +219,27 @@
                 if found:
                     return proxy
         return None
-                
+
 
 
 def set_logging(options, plogger=None, queue=None):
-    
+
     logging_config = {
         "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s',
         "level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable
     }
-    
+
     if options.logfile == "stdout":
         logging_config["stream"] = sys.stdout
     elif options.logfile == "stderr":
         logging_config["stream"] = sys.stderr
     else:
         logging_config["filename"] = options.logfile
-            
+
     logger = plogger
     if logger is None:
         logger = get_logger() #@UndefinedVariable
-    
+
     if len(logger.handlers) == 0:
         filename = logging_config.get("filename")
         if queue is not None:
@@ -239,7 +250,7 @@
         else:
             stream = logging_config.get("stream")
             hdlr = logging.StreamHandler(stream) #@UndefinedVariable
-            
+
         fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable
         dfs = logging_config.get("datefmt", None)
         fmt = logging.Formatter(fs, dfs) #@UndefinedVariable
@@ -248,7 +259,7 @@
         level = logging_config.get("level")
         if level is not None:
             logger.setLevel(level)
-    
+
     options.debug = (options.verbose-options.quiet > 0)
     return logger
 
@@ -261,12 +272,12 @@
                       help="quiet", default=0)
 
 def get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
-    
+
     query = query.join(EntityHashtag).join(Hashtag)
-    
+
     if tweet_exclude_table is not None:
         query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable
-    
+
     if start_date:
         query = query.filter(Tweet.created_at >=  start_date)
     if end_date:
@@ -275,32 +286,32 @@
     if user_whitelist:
         query = query.join(User).filter(User.screen_name.in_(user_whitelist))
 
-    
+
     if hashtags :
         def merge_hash(l,h):
             l.extend(h.split(","))
             return l
         htags = functools.reduce(merge_hash, hashtags, [])
-        
+
         query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable
-    
+
     return query
 
-    
-    
+
+
 def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
-    
+
     query = session.query(Tweet)
-    query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist) 
+    query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist)
     return query.order_by(Tweet.created_at)
-    
+
 
 def get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table):
-    
+
     query = session.query(User).join(Tweet)
-    
-    query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, None)    
-    
+
+    query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, None)
+
     return query.distinct()
 
 logger_name = "iri.tweet"
@@ -312,7 +323,7 @@
 
 class QueueHandler(logging.Handler):
     """
-    This is a logging handler which sends events to a multiprocessing queue.    
+    This is a logging handler which sends events to a multiprocessing queue.
     """
 
     def __init__(self, queue, ignore_full):
@@ -322,7 +333,7 @@
         logging.Handler.__init__(self) #@UndefinedVariable
         self.queue = queue
         self.ignore_full = True
-        
+
     def emit(self, record):
         """
         Emit a record.
@@ -338,7 +349,7 @@
                 self.queue.put_nowait(record)
         except AssertionError:
             pass
-        except Queue.Full:
+        except queue.Full:
             if self.ignore_full:
                 pass
             else:
@@ -366,7 +377,7 @@
     if percent >= 100:
         writer.write("\n")
     writer.flush()
-    
+
     return writer
 
 def get_unused_port():
--- a/script/virtualenv/script/res/requirement.txt	Mon Jul 01 14:35:52 2019 +0200
+++ b/script/virtualenv/script/res/requirement.txt	Tue Jul 02 17:41:28 2019 +0200
@@ -1,25 +1,26 @@
-astroid==2.1.0
+astroid==2.2.5
 blessings==1.7
-certifi==2018.11.29
+certifi==2019.6.16
 chardet==3.0.4
 cssselect==1.0.3
 docutils==0.14
 idna==2.8
-isort==4.3.4
-lazy-object-proxy==1.3.1
+isort==4.3.21
+lazy-object-proxy==1.4.1
 lockfile==0.12.2
-lxml==4.2.5
+lxml==4.3.4
 mccabe==0.6.1
-oauthlib==2.1.0
-pylint==2.2.2
+oauthlib==3.0.1
+pylint==2.3.1
 pyquery==1.4.0
-python-daemon==2.2.0
-python-dateutil==2.7.5
-requests==2.21.0
-requests-oauthlib==1.1.0
+python-daemon==2.2.3
+python-dateutil==2.8.0
+requests==2.22.0
+requests-oauthlib==1.2.0
 six==1.12.0
-SQLAlchemy==1.2.15
+SQLAlchemy==1.3.5
 twitter==1.18.0
 twitter-text==3.0
-urllib3==1.24.1
-wrapt==1.10.11
+typed-ast==1.4.0
+urllib3==1.25.3
+wrapt==1.11.2