# HG changeset patch # User ymh # Date 1562082088 -7200 # Node ID 1e7aa7dc444b993e0e5db64baef2fe771f071737 # Parent 2b69678563e8700be69cbcae609c644e4cd7c653 adapt polemictweet to python 3.7, new twitter model and update requirement diff -r 2b69678563e8 -r 1e7aa7dc444b .hgignore --- a/.hgignore Mon Jul 01 14:35:52 2019 +0200 +++ b/.hgignore Tue Jul 02 17:41:28 2019 +0200 @@ -37,3 +37,4 @@ ^sbin/sync/sync_live ^web/vendor ^web/devroot$ +^script/utils/.*\.json$ diff -r 2b69678563e8 -r 1e7aa7dc444b script/.envrc --- a/script/.envrc Mon Jul 01 14:35:52 2019 +0200 +++ b/script/.envrc Tue Jul 02 17:41:28 2019 +0200 @@ -1,1 +1,2 @@ -use pythonvenv 3.7.1+brew +use pythonvenv 3.7.3+brew +export PYTHONPATH="/Users/ymh/dev/projects/tweet_live/script/lib/iri_tweet" diff -r 2b69678563e8 -r 1e7aa7dc444b script/.vscode/settings.json --- a/script/.vscode/settings.json Mon Jul 01 14:35:52 2019 +0200 +++ b/script/.vscode/settings.json Tue Jul 02 17:41:28 2019 +0200 @@ -1,4 +1,4 @@ { - "python.pythonPath": "/Users/ymh/dev/projects/tweet_live/script/.direnv/python-3.7.1/bin/python", + "python.pythonPath": ".direnv/python-3.7.3/bin/python", "python.analysis.diagnosticPublishDelay": 996 } \ No newline at end of file diff -r 2b69678563e8 -r 1e7aa7dc444b script/lib/iri_tweet/iri_tweet/__init__.py --- a/script/lib/iri_tweet/iri_tweet/__init__.py Mon Jul 01 14:35:52 2019 +0200 +++ b/script/lib/iri_tweet/iri_tweet/__init__.py Tue Jul 02 17:41:28 2019 +0200 @@ -2,7 +2,7 @@ VERSION = (0, 82, 0, "final", 0) -VERSION_STR = unicode(".".join(map(lambda i:"%02d" % (i,), VERSION[:2]))) +VERSION_STR = ".".join(map(lambda i:"%02d" % (i,), VERSION[:2])) def get_version(): diff -r 2b69678563e8 -r 1e7aa7dc444b script/lib/iri_tweet/iri_tweet/models.py --- a/script/lib/iri_tweet/iri_tweet/models.py Mon Jul 01 14:35:52 2019 +0200 +++ b/script/lib/iri_tweet/iri_tweet/models.py Tue Jul 02 17:41:28 2019 +0200 @@ -12,7 +12,7 @@ Base = declarative_base() -APPLICATION_NAME = "IRI_TWITTER" +APPLICATION_NAME = "IRI_TWITTER" ACCESS_TOKEN_KEY = None ACCESS_TOKEN_SECRET = None @@ -27,7 +27,7 @@ return json.dumps(obj) class TweetMeta(type(Base)): - + def __init__(cls, name, bases, ns): #@NoSelf def init(self, **kwargs): for key, value in kwargs.items(): @@ -36,7 +36,7 @@ super(cls, self).__init__() setattr(cls, '__init__', init) super(TweetMeta, cls).__init__(name, bases, ns) - + class ProcessEvent(Base): __metaclass__ = TweetMeta @@ -45,7 +45,7 @@ ts = Column(DateTime, default=datetime.datetime.utcnow, index=True) type = Column(Enum("start","pid","shutdown","error", "start_worker", "stop_worker", "model_version", "application_name", "application_version", name="process_event_type_enum"), nullable=False) args = Column(String) - + class EntityType(Base): __metaclass__ = TweetMeta __tablename__ = "tweet_entity_type" @@ -75,7 +75,7 @@ class TweetLog(Base): - + TWEET_STATUS = { 'OK' : 1, 'ERROR' : 2, @@ -90,7 +90,7 @@ 'DELETE_PENDING': 4 } __metaclass__ = TweetMeta - + __tablename__ = 'tweet_tweet_log' id = Column(Integer, primary_key=True, autoincrement=True) ts = Column(DateTime, default=datetime.datetime.utcnow, index=True) @@ -100,8 +100,8 @@ status = Column(Integer) error = Column(String) error_stack = Column(String) - - + + class Tweet(Base): __metaclass__ = TweetMeta __tablename__ = 'tweet_tweet' @@ -109,12 +109,12 @@ id = Column(BigInteger, primary_key=True, autoincrement=False) id_str = Column(String) contributors = Column(String) - coordinates = Column(String) + coordinates = Column(String) created_at = Column(DateTime, index=True) favorited = Column(Boolean) geo = Column(String) in_reply_to_screen_name = Column(String) - in_reply_to_status_id = Column(BigInteger) + in_reply_to_status_id = Column(BigInteger) in_reply_to_status_id_str = Column(String) in_reply_to_user_id = Column(BigInteger) in_reply_to_user_id_str = Column(String) @@ -130,7 +130,7 @@ tweet_source = relationship("TweetSource", backref="tweet") entity_list = relationship(Entity, backref='tweet', cascade="all, delete-orphan") received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True) - + class UserMessage(Base): __metaclass__ = TweetMeta @@ -145,54 +145,41 @@ class Message(Base): __metaclass__ = TweetMeta __tablename__ = "tweet_message" - + id = Column(Integer, primary_key=True) created_at = Column(DateTime, default=datetime.datetime.utcnow) text = Column(String) users = relationship(UserMessage, backref='message') - + class User(Base): __metaclass__ = TweetMeta __tablename__ = "tweet_user" - + id = Column(BigInteger, primary_key=True, autoincrement=False) id_str = Column(String) - contributors_enabled = Column(Boolean) created_at = Column(DateTime, index=True) description = Column(String) favourites_count = Column(Integer) - follow_request_sent = Column(Boolean) followers_count = Column(Integer) - following = Column(String) friends_count = Column(Integer) - geo_enabled = Column(Boolean) - is_translator = Column(Boolean) - lang = Column(String) listed_count = Column(Integer) location = Column(String) name = Column(String) - notifications = Column(String) - profile_background_color = Column(String) - profile_background_image_url = Column(String) - profile_background_tile = Column(Boolean) - profile_image_url = Column(String) - profile_image_url_https = Column(String) - profile_link_color = Column(String) - profile_sidebar_border_color = Column(String) - profile_sidebar_fill_color = Column(String) - profile_text_color = Column(String) default_profile_image = Column(String) - profile_use_background_image = Column(Boolean) + default_profile = Column(Boolean) protected = Column(Boolean) screen_name = Column(String, index=True) - show_all_inline_media = Column(Boolean) statuses_count = Column(Integer) - time_zone = Column(String) url = Column(String) - utc_offset = Column(Integer) verified = Column(Boolean) - + derived = Column(String) #JSON + profile_banner_url = Column(String) + profile_image_url_https = Column(String) + withheld_in_countries = Column(String) # ARRAY + withheld_scope = Column(String) + + class Hashtag(Base): __metaclass__ = TweetMeta @@ -215,7 +202,7 @@ id = Column(Integer, primary_key=True, autoincrement=True) label = Column(String, unique=True, index=True) - + class Media(Base): __metaclass__ = TweetMeta @@ -231,7 +218,7 @@ type_id = Column(Integer, ForeignKey("tweet_media_type.id")) type = relationship(MediaType, primaryjoin=type_id == MediaType.id) - + class EntityHashtag(Entity): __tablename__ = "tweet_entity_hashtag" @@ -240,7 +227,7 @@ hashtag_id = Column(Integer, ForeignKey("tweet_hashtag.id")) hashtag = relationship(Hashtag, primaryjoin=hashtag_id == Hashtag.id) - + class EntityUrl(Entity): __tablename__ = "tweet_entity_url" __mapper_args__ = {'polymorphic_identity': 'entity_url'} @@ -255,7 +242,7 @@ user_id = Column(BigInteger, ForeignKey('tweet_user.id')) user = relationship(User, primaryjoin=(user_id == User.id)) - + class EntityMedia(Entity): __tablename__ = "tweet_entity_media" __mapper_args__ = {'polymorphic_identity': 'entity_media'} @@ -268,31 +255,31 @@ session.add(pe) if must_commit: session.commit() - + def setup_database(*args, **kwargs): - + session_argname = [ 'autoflush','binds', "class_", "_enable_transaction_accounting","expire_on_commit", "extension", "query_cls", "twophase", "weak_identity_map", "autocommit"] - + kwargs_ce = dict((k, v) for k,v in kwargs.items() if (k not in session_argname and k != "create_all")) engine = create_engine(*args, **kwargs_ce) - + if engine.name == "sqlite": @event.listens_for(Engine, "connect") def set_sqlite_pragma(dbapi_connection, connection_record): #pylint: W0612 cursor = dbapi_connection.cursor() cursor.execute("PRAGMA foreign_keys=ON") cursor.close() - - metadata = Base.metadata - + + metadata = Base.metadata + kwargs_sm = {'bind': engine} - + kwargs_sm.update([(argname, kwargs[argname]) for argname in session_argname if argname in kwargs]) Session = sessionmaker(**kwargs_sm) #set model version - + if kwargs.get('create_all', True): metadata.create_all(engine) session = Session() diff -r 2b69678563e8 -r 1e7aa7dc444b script/lib/iri_tweet/iri_tweet/utils.py --- a/script/lib/iri_tweet/iri_tweet/utils.py Mon Jul 01 14:35:52 2019 +0200 +++ b/script/lib/iri_tweet/iri_tweet/utils.py Tue Jul 02 17:41:28 2019 +0200 @@ -5,13 +5,14 @@ import logging import math import os.path -import Queue +import queue import socket import sys import twitter.oauth import twitter.oauth_dance from sqlalchemy.sql import or_, select +from sqlalchemy.orm import class_mapper from .models import (ACCESS_TOKEN_KEY, ACCESS_TOKEN_SECRET, APPLICATION_NAME, EntityHashtag, Hashtag, Tweet, User, adapt_date, @@ -20,18 +21,18 @@ CACHE_ACCESS_TOKEN = {} def get_oauth_token(consumer_key, consumer_secret, token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME): - + global CACHE_ACCESS_TOKEN if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET: return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET - + res = CACHE_ACCESS_TOKEN.get(application_name, None) - + if res is None and token_file_path and os.path.exists(token_file_path): get_logger().debug("get_oauth_token : reading token from file %s" % token_file_path) #@UndefinedVariable res = twitter.oauth.read_token_file(token_file_path) - + if res is not None and check_access_token: get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], consumer_key, consumer_secret)) @@ -39,7 +40,7 @@ try: status = t.application.rate_limit_status(resources="account") except Exception as e: - get_logger().debug("get_oauth_token : error getting rate limit status %s " % repr(e)) + get_logger().debug("get_oauth_token : error getting rate limit status %s " % repr(e)) get_logger().debug("get_oauth_token : error getting rate limit status %s " % str(e)) status = None get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable @@ -50,27 +51,27 @@ if res is None: get_logger().debug("get_oauth_token : doing the oauth dance") res = twitter.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path) - - + + CACHE_ACCESS_TOKEN[application_name] = res - + get_logger().debug("get_oauth_token : done got %s" % repr(res)) return res def get_oauth2_token(consumer_key, consumer_secret, token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME): - + global CACHE_ACCESS_TOKEN if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET: return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET - + res = CACHE_ACCESS_TOKEN.get(application_name, None) - + if res is None and token_file_path and os.path.exists(token_file_path): get_logger().debug("get_oauth2_token : reading token from file %s" % token_file_path) #@UndefinedVariable res = twitter.oauth2.read_bearer_token_file(token_file_path) - + if res is not None and check_access_token: get_logger().debug("get_oauth2_token : Check oauth tokens") #@UndefinedVariable t = twitter.Twitter(auth=twitter.OAuth2(consumer_key, consumer_secret, res)) @@ -78,7 +79,7 @@ try: status = t.application.rate_limit_status() except Exception as e: - get_logger().debug("get_oauth2_token : error getting rate limit status %s " % repr(e)) + get_logger().debug("get_oauth2_token : error getting rate limit status %s " % repr(e)) get_logger().debug("get_oauth2_token : error getting rate limit status %s " % str(e)) status = None get_logger().debug("get_oauth2_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable @@ -89,10 +90,10 @@ if res is None: get_logger().debug("get_oauth2_token : doing the oauth dance") res = twitter.oauth2_dance(consumer_key, consumer_secret, token_file_path) - - + + CACHE_ACCESS_TOKEN[application_name] = res - + get_logger().debug("get_oauth_token : done got %s" % repr(res)) return res @@ -102,7 +103,7 @@ return datetime.datetime(*ts[0:7]) def clean_keys(dict_val): - return dict([(str(key),value) for key,value in dict_val.iteritems()]) + return dict([(str(key),value) for key,value in dict_val.items()]) fields_adapter = { 'stream': { @@ -115,22 +116,29 @@ }, "user": { "created_at" : adapt_date, + "derived" : adapt_json, + "withheld_in_countries" : adapt_json }, }, - + 'entities' : { "medias": { "sizes" : adapt_json, - }, + }, }, 'rest': { + "user": { + "created_at" : adapt_date, + "derived" : adapt_json, + "withheld_in_countries" : adapt_json + }, "tweet" : { "place" : adapt_json, "geo" : adapt_json, "created_at" : adapt_date, # "original_json" : adapt_json, - }, + }, }, } @@ -143,41 +151,44 @@ return adapter_mapping[field](value) else: return value - return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.iteritems()]) + return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) class ObjectBufferProxy(object): def __init__(self, klass, args, kwargs, must_flush, instance=None): self.klass= klass + self.mapper = class_mapper(klass) self.args = args self.kwargs = kwargs self.must_flush = must_flush self.instance = instance - + def persists(self, session): new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else [] new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {} - - self.instance = self.klass(*new_args, **new_kwargs) + + self.instance = self.klass(*new_args, **{ + k: v for k, v in new_kwargs.items() if k in self.mapper.attrs.keys() + }) if self.instance is not None: self.instance = session.merge(self.instance) session.add(self.instance) if self.must_flush: session.flush() - + def __getattr__(self, name): return lambda : getattr(self.instance, name) if self.instance else None - - - + + + class ObjectsBuffer(object): def __init__(self): self.__bufferlist = [] self.__bufferdict = {} - + def __add_proxy_object(self, proxy): proxy_list = self.__bufferdict.get(proxy.klass, None) if proxy_list is None: @@ -185,16 +196,16 @@ self.__bufferdict[proxy.klass] = proxy_list proxy_list.append(proxy) self.__bufferlist.append(proxy) - + def persists(self, session): for object_proxy in self.__bufferlist: object_proxy.persists(session) - + def add_object(self, klass, args, kwargs, must_flush, instance=None): new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush, instance) self.__add_proxy_object(new_proxy) - return new_proxy - + return new_proxy + def get(self, klass, **kwargs): if klass in self.__bufferdict: for proxy in self.__bufferdict[klass]: @@ -208,27 +219,27 @@ if found: return proxy return None - + def set_logging(options, plogger=None, queue=None): - + logging_config = { "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s', "level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable } - + if options.logfile == "stdout": logging_config["stream"] = sys.stdout elif options.logfile == "stderr": logging_config["stream"] = sys.stderr else: logging_config["filename"] = options.logfile - + logger = plogger if logger is None: logger = get_logger() #@UndefinedVariable - + if len(logger.handlers) == 0: filename = logging_config.get("filename") if queue is not None: @@ -239,7 +250,7 @@ else: stream = logging_config.get("stream") hdlr = logging.StreamHandler(stream) #@UndefinedVariable - + fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable dfs = logging_config.get("datefmt", None) fmt = logging.Formatter(fs, dfs) #@UndefinedVariable @@ -248,7 +259,7 @@ level = logging_config.get("level") if level is not None: logger.setLevel(level) - + options.debug = (options.verbose-options.quiet > 0) return logger @@ -261,12 +272,12 @@ help="quiet", default=0) def get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist): - + query = query.join(EntityHashtag).join(Hashtag) - + if tweet_exclude_table is not None: query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable - + if start_date: query = query.filter(Tweet.created_at >= start_date) if end_date: @@ -275,32 +286,32 @@ if user_whitelist: query = query.join(User).filter(User.screen_name.in_(user_whitelist)) - + if hashtags : def merge_hash(l,h): l.extend(h.split(",")) return l htags = functools.reduce(merge_hash, hashtags, []) - + query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable - + return query - - + + def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist): - + query = session.query(Tweet) - query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist) + query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist) return query.order_by(Tweet.created_at) - + def get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table): - + query = session.query(User).join(Tweet) - - query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, None) - + + query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, None) + return query.distinct() logger_name = "iri.tweet" @@ -312,7 +323,7 @@ class QueueHandler(logging.Handler): """ - This is a logging handler which sends events to a multiprocessing queue. + This is a logging handler which sends events to a multiprocessing queue. """ def __init__(self, queue, ignore_full): @@ -322,7 +333,7 @@ logging.Handler.__init__(self) #@UndefinedVariable self.queue = queue self.ignore_full = True - + def emit(self, record): """ Emit a record. @@ -338,7 +349,7 @@ self.queue.put_nowait(record) except AssertionError: pass - except Queue.Full: + except queue.Full: if self.ignore_full: pass else: @@ -366,7 +377,7 @@ if percent >= 100: writer.write("\n") writer.flush() - + return writer def get_unused_port(): diff -r 2b69678563e8 -r 1e7aa7dc444b script/virtualenv/script/res/requirement.txt --- a/script/virtualenv/script/res/requirement.txt Mon Jul 01 14:35:52 2019 +0200 +++ b/script/virtualenv/script/res/requirement.txt Tue Jul 02 17:41:28 2019 +0200 @@ -1,25 +1,26 @@ -astroid==2.1.0 +astroid==2.2.5 blessings==1.7 -certifi==2018.11.29 +certifi==2019.6.16 chardet==3.0.4 cssselect==1.0.3 docutils==0.14 idna==2.8 -isort==4.3.4 -lazy-object-proxy==1.3.1 +isort==4.3.21 +lazy-object-proxy==1.4.1 lockfile==0.12.2 -lxml==4.2.5 +lxml==4.3.4 mccabe==0.6.1 -oauthlib==2.1.0 -pylint==2.2.2 +oauthlib==3.0.1 +pylint==2.3.1 pyquery==1.4.0 -python-daemon==2.2.0 -python-dateutil==2.7.5 -requests==2.21.0 -requests-oauthlib==1.1.0 +python-daemon==2.2.3 +python-dateutil==2.8.0 +requests==2.22.0 +requests-oauthlib==1.2.0 six==1.12.0 -SQLAlchemy==1.2.15 +SQLAlchemy==1.3.5 twitter==1.18.0 twitter-text==3.0 -urllib3==1.24.1 -wrapt==1.10.11 +typed-ast==1.4.0 +urllib3==1.25.3 +wrapt==1.11.2