--- a/.hgignore Mon Jul 01 14:35:52 2019 +0200
+++ b/.hgignore Tue Jul 02 17:41:28 2019 +0200
@@ -37,3 +37,4 @@
^sbin/sync/sync_live
^web/vendor
^web/devroot$
+^script/utils/.*\.json$
--- a/script/.envrc Mon Jul 01 14:35:52 2019 +0200
+++ b/script/.envrc Tue Jul 02 17:41:28 2019 +0200
@@ -1,1 +1,2 @@
-use pythonvenv 3.7.1+brew
+use pythonvenv 3.7.3+brew
+export PYTHONPATH="/Users/ymh/dev/projects/tweet_live/script/lib/iri_tweet"
--- a/script/.vscode/settings.json Mon Jul 01 14:35:52 2019 +0200
+++ b/script/.vscode/settings.json Tue Jul 02 17:41:28 2019 +0200
@@ -1,4 +1,4 @@
{
- "python.pythonPath": "/Users/ymh/dev/projects/tweet_live/script/.direnv/python-3.7.1/bin/python",
+ "python.pythonPath": ".direnv/python-3.7.3/bin/python",
"python.analysis.diagnosticPublishDelay": 996
}
\ No newline at end of file
--- a/script/lib/iri_tweet/iri_tweet/__init__.py Mon Jul 01 14:35:52 2019 +0200
+++ b/script/lib/iri_tweet/iri_tweet/__init__.py Tue Jul 02 17:41:28 2019 +0200
@@ -2,7 +2,7 @@
VERSION = (0, 82, 0, "final", 0)
-VERSION_STR = unicode(".".join(map(lambda i:"%02d" % (i,), VERSION[:2])))
+VERSION_STR = ".".join(map(lambda i:"%02d" % (i,), VERSION[:2]))
def get_version():
--- a/script/lib/iri_tweet/iri_tweet/models.py Mon Jul 01 14:35:52 2019 +0200
+++ b/script/lib/iri_tweet/iri_tweet/models.py Tue Jul 02 17:41:28 2019 +0200
@@ -12,7 +12,7 @@
Base = declarative_base()
-APPLICATION_NAME = "IRI_TWITTER"
+APPLICATION_NAME = "IRI_TWITTER"
ACCESS_TOKEN_KEY = None
ACCESS_TOKEN_SECRET = None
@@ -27,7 +27,7 @@
return json.dumps(obj)
class TweetMeta(type(Base)):
-
+
def __init__(cls, name, bases, ns): #@NoSelf
def init(self, **kwargs):
for key, value in kwargs.items():
@@ -36,7 +36,7 @@
super(cls, self).__init__()
setattr(cls, '__init__', init)
super(TweetMeta, cls).__init__(name, bases, ns)
-
+
class ProcessEvent(Base):
__metaclass__ = TweetMeta
@@ -45,7 +45,7 @@
ts = Column(DateTime, default=datetime.datetime.utcnow, index=True)
type = Column(Enum("start","pid","shutdown","error", "start_worker", "stop_worker", "model_version", "application_name", "application_version", name="process_event_type_enum"), nullable=False)
args = Column(String)
-
+
class EntityType(Base):
__metaclass__ = TweetMeta
__tablename__ = "tweet_entity_type"
@@ -75,7 +75,7 @@
class TweetLog(Base):
-
+
TWEET_STATUS = {
'OK' : 1,
'ERROR' : 2,
@@ -90,7 +90,7 @@
'DELETE_PENDING': 4
}
__metaclass__ = TweetMeta
-
+
__tablename__ = 'tweet_tweet_log'
id = Column(Integer, primary_key=True, autoincrement=True)
ts = Column(DateTime, default=datetime.datetime.utcnow, index=True)
@@ -100,8 +100,8 @@
status = Column(Integer)
error = Column(String)
error_stack = Column(String)
-
-
+
+
class Tweet(Base):
__metaclass__ = TweetMeta
__tablename__ = 'tweet_tweet'
@@ -109,12 +109,12 @@
id = Column(BigInteger, primary_key=True, autoincrement=False)
id_str = Column(String)
contributors = Column(String)
- coordinates = Column(String)
+ coordinates = Column(String)
created_at = Column(DateTime, index=True)
favorited = Column(Boolean)
geo = Column(String)
in_reply_to_screen_name = Column(String)
- in_reply_to_status_id = Column(BigInteger)
+ in_reply_to_status_id = Column(BigInteger)
in_reply_to_status_id_str = Column(String)
in_reply_to_user_id = Column(BigInteger)
in_reply_to_user_id_str = Column(String)
@@ -130,7 +130,7 @@
tweet_source = relationship("TweetSource", backref="tweet")
entity_list = relationship(Entity, backref='tweet', cascade="all, delete-orphan")
received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True)
-
+
class UserMessage(Base):
__metaclass__ = TweetMeta
@@ -145,54 +145,41 @@
class Message(Base):
__metaclass__ = TweetMeta
__tablename__ = "tweet_message"
-
+
id = Column(Integer, primary_key=True)
created_at = Column(DateTime, default=datetime.datetime.utcnow)
text = Column(String)
users = relationship(UserMessage, backref='message')
-
+
class User(Base):
__metaclass__ = TweetMeta
__tablename__ = "tweet_user"
-
+
id = Column(BigInteger, primary_key=True, autoincrement=False)
id_str = Column(String)
- contributors_enabled = Column(Boolean)
created_at = Column(DateTime, index=True)
description = Column(String)
favourites_count = Column(Integer)
- follow_request_sent = Column(Boolean)
followers_count = Column(Integer)
- following = Column(String)
friends_count = Column(Integer)
- geo_enabled = Column(Boolean)
- is_translator = Column(Boolean)
- lang = Column(String)
listed_count = Column(Integer)
location = Column(String)
name = Column(String)
- notifications = Column(String)
- profile_background_color = Column(String)
- profile_background_image_url = Column(String)
- profile_background_tile = Column(Boolean)
- profile_image_url = Column(String)
- profile_image_url_https = Column(String)
- profile_link_color = Column(String)
- profile_sidebar_border_color = Column(String)
- profile_sidebar_fill_color = Column(String)
- profile_text_color = Column(String)
default_profile_image = Column(String)
- profile_use_background_image = Column(Boolean)
+ default_profile = Column(Boolean)
protected = Column(Boolean)
screen_name = Column(String, index=True)
- show_all_inline_media = Column(Boolean)
statuses_count = Column(Integer)
- time_zone = Column(String)
url = Column(String)
- utc_offset = Column(Integer)
verified = Column(Boolean)
-
+ derived = Column(String) #JSON
+ profile_banner_url = Column(String)
+ profile_image_url_https = Column(String)
+ withheld_in_countries = Column(String) # ARRAY
+ withheld_scope = Column(String)
+
+
class Hashtag(Base):
__metaclass__ = TweetMeta
@@ -215,7 +202,7 @@
id = Column(Integer, primary_key=True, autoincrement=True)
label = Column(String, unique=True, index=True)
-
+
class Media(Base):
__metaclass__ = TweetMeta
@@ -231,7 +218,7 @@
type_id = Column(Integer, ForeignKey("tweet_media_type.id"))
type = relationship(MediaType, primaryjoin=type_id == MediaType.id)
-
+
class EntityHashtag(Entity):
__tablename__ = "tweet_entity_hashtag"
@@ -240,7 +227,7 @@
hashtag_id = Column(Integer, ForeignKey("tweet_hashtag.id"))
hashtag = relationship(Hashtag, primaryjoin=hashtag_id == Hashtag.id)
-
+
class EntityUrl(Entity):
__tablename__ = "tweet_entity_url"
__mapper_args__ = {'polymorphic_identity': 'entity_url'}
@@ -255,7 +242,7 @@
user_id = Column(BigInteger, ForeignKey('tweet_user.id'))
user = relationship(User, primaryjoin=(user_id == User.id))
-
+
class EntityMedia(Entity):
__tablename__ = "tweet_entity_media"
__mapper_args__ = {'polymorphic_identity': 'entity_media'}
@@ -268,31 +255,31 @@
session.add(pe)
if must_commit:
session.commit()
-
+
def setup_database(*args, **kwargs):
-
+
session_argname = [ 'autoflush','binds', "class_", "_enable_transaction_accounting","expire_on_commit", "extension", "query_cls", "twophase", "weak_identity_map", "autocommit"]
-
+
kwargs_ce = dict((k, v) for k,v in kwargs.items() if (k not in session_argname and k != "create_all"))
engine = create_engine(*args, **kwargs_ce)
-
+
if engine.name == "sqlite":
@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record): #pylint: W0612
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
-
- metadata = Base.metadata
-
+
+ metadata = Base.metadata
+
kwargs_sm = {'bind': engine}
-
+
kwargs_sm.update([(argname, kwargs[argname]) for argname in session_argname if argname in kwargs])
Session = sessionmaker(**kwargs_sm)
#set model version
-
+
if kwargs.get('create_all', True):
metadata.create_all(engine)
session = Session()
--- a/script/lib/iri_tweet/iri_tweet/utils.py Mon Jul 01 14:35:52 2019 +0200
+++ b/script/lib/iri_tweet/iri_tweet/utils.py Tue Jul 02 17:41:28 2019 +0200
@@ -5,13 +5,14 @@
import logging
import math
import os.path
-import Queue
+import queue
import socket
import sys
import twitter.oauth
import twitter.oauth_dance
from sqlalchemy.sql import or_, select
+from sqlalchemy.orm import class_mapper
from .models import (ACCESS_TOKEN_KEY, ACCESS_TOKEN_SECRET, APPLICATION_NAME,
EntityHashtag, Hashtag, Tweet, User, adapt_date,
@@ -20,18 +21,18 @@
CACHE_ACCESS_TOKEN = {}
def get_oauth_token(consumer_key, consumer_secret, token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME):
-
+
global CACHE_ACCESS_TOKEN
if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
-
+
res = CACHE_ACCESS_TOKEN.get(application_name, None)
-
+
if res is None and token_file_path and os.path.exists(token_file_path):
get_logger().debug("get_oauth_token : reading token from file %s" % token_file_path) #@UndefinedVariable
res = twitter.oauth.read_token_file(token_file_path)
-
+
if res is not None and check_access_token:
get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable
t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], consumer_key, consumer_secret))
@@ -39,7 +40,7 @@
try:
status = t.application.rate_limit_status(resources="account")
except Exception as e:
- get_logger().debug("get_oauth_token : error getting rate limit status %s " % repr(e))
+ get_logger().debug("get_oauth_token : error getting rate limit status %s " % repr(e))
get_logger().debug("get_oauth_token : error getting rate limit status %s " % str(e))
status = None
get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable
@@ -50,27 +51,27 @@
if res is None:
get_logger().debug("get_oauth_token : doing the oauth dance")
res = twitter.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
-
-
+
+
CACHE_ACCESS_TOKEN[application_name] = res
-
+
get_logger().debug("get_oauth_token : done got %s" % repr(res))
return res
def get_oauth2_token(consumer_key, consumer_secret, token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME):
-
+
global CACHE_ACCESS_TOKEN
if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
-
+
res = CACHE_ACCESS_TOKEN.get(application_name, None)
-
+
if res is None and token_file_path and os.path.exists(token_file_path):
get_logger().debug("get_oauth2_token : reading token from file %s" % token_file_path) #@UndefinedVariable
res = twitter.oauth2.read_bearer_token_file(token_file_path)
-
+
if res is not None and check_access_token:
get_logger().debug("get_oauth2_token : Check oauth tokens") #@UndefinedVariable
t = twitter.Twitter(auth=twitter.OAuth2(consumer_key, consumer_secret, res))
@@ -78,7 +79,7 @@
try:
status = t.application.rate_limit_status()
except Exception as e:
- get_logger().debug("get_oauth2_token : error getting rate limit status %s " % repr(e))
+ get_logger().debug("get_oauth2_token : error getting rate limit status %s " % repr(e))
get_logger().debug("get_oauth2_token : error getting rate limit status %s " % str(e))
status = None
get_logger().debug("get_oauth2_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable
@@ -89,10 +90,10 @@
if res is None:
get_logger().debug("get_oauth2_token : doing the oauth dance")
res = twitter.oauth2_dance(consumer_key, consumer_secret, token_file_path)
-
-
+
+
CACHE_ACCESS_TOKEN[application_name] = res
-
+
get_logger().debug("get_oauth_token : done got %s" % repr(res))
return res
@@ -102,7 +103,7 @@
return datetime.datetime(*ts[0:7])
def clean_keys(dict_val):
- return dict([(str(key),value) for key,value in dict_val.iteritems()])
+ return dict([(str(key),value) for key,value in dict_val.items()])
fields_adapter = {
'stream': {
@@ -115,22 +116,29 @@
},
"user": {
"created_at" : adapt_date,
+ "derived" : adapt_json,
+ "withheld_in_countries" : adapt_json
},
},
-
+
'entities' : {
"medias": {
"sizes" : adapt_json,
- },
+ },
},
'rest': {
+ "user": {
+ "created_at" : adapt_date,
+ "derived" : adapt_json,
+ "withheld_in_countries" : adapt_json
+ },
"tweet" : {
"place" : adapt_json,
"geo" : adapt_json,
"created_at" : adapt_date,
# "original_json" : adapt_json,
- },
+ },
},
}
@@ -143,41 +151,44 @@
return adapter_mapping[field](value)
else:
return value
- return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.iteritems()])
+ return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])
class ObjectBufferProxy(object):
def __init__(self, klass, args, kwargs, must_flush, instance=None):
self.klass= klass
+ self.mapper = class_mapper(klass)
self.args = args
self.kwargs = kwargs
self.must_flush = must_flush
self.instance = instance
-
+
def persists(self, session):
new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {}
-
- self.instance = self.klass(*new_args, **new_kwargs)
+
+ self.instance = self.klass(*new_args, **{
+ k: v for k, v in new_kwargs.items() if k in self.mapper.attrs.keys()
+ })
if self.instance is not None:
self.instance = session.merge(self.instance)
session.add(self.instance)
if self.must_flush:
session.flush()
-
+
def __getattr__(self, name):
return lambda : getattr(self.instance, name) if self.instance else None
-
-
-
+
+
+
class ObjectsBuffer(object):
def __init__(self):
self.__bufferlist = []
self.__bufferdict = {}
-
+
def __add_proxy_object(self, proxy):
proxy_list = self.__bufferdict.get(proxy.klass, None)
if proxy_list is None:
@@ -185,16 +196,16 @@
self.__bufferdict[proxy.klass] = proxy_list
proxy_list.append(proxy)
self.__bufferlist.append(proxy)
-
+
def persists(self, session):
for object_proxy in self.__bufferlist:
object_proxy.persists(session)
-
+
def add_object(self, klass, args, kwargs, must_flush, instance=None):
new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush, instance)
self.__add_proxy_object(new_proxy)
- return new_proxy
-
+ return new_proxy
+
def get(self, klass, **kwargs):
if klass in self.__bufferdict:
for proxy in self.__bufferdict[klass]:
@@ -208,27 +219,27 @@
if found:
return proxy
return None
-
+
def set_logging(options, plogger=None, queue=None):
-
+
logging_config = {
"format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s',
"level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable
}
-
+
if options.logfile == "stdout":
logging_config["stream"] = sys.stdout
elif options.logfile == "stderr":
logging_config["stream"] = sys.stderr
else:
logging_config["filename"] = options.logfile
-
+
logger = plogger
if logger is None:
logger = get_logger() #@UndefinedVariable
-
+
if len(logger.handlers) == 0:
filename = logging_config.get("filename")
if queue is not None:
@@ -239,7 +250,7 @@
else:
stream = logging_config.get("stream")
hdlr = logging.StreamHandler(stream) #@UndefinedVariable
-
+
fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable
dfs = logging_config.get("datefmt", None)
fmt = logging.Formatter(fs, dfs) #@UndefinedVariable
@@ -248,7 +259,7 @@
level = logging_config.get("level")
if level is not None:
logger.setLevel(level)
-
+
options.debug = (options.verbose-options.quiet > 0)
return logger
@@ -261,12 +272,12 @@
help="quiet", default=0)
def get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
-
+
query = query.join(EntityHashtag).join(Hashtag)
-
+
if tweet_exclude_table is not None:
query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable
-
+
if start_date:
query = query.filter(Tweet.created_at >= start_date)
if end_date:
@@ -275,32 +286,32 @@
if user_whitelist:
query = query.join(User).filter(User.screen_name.in_(user_whitelist))
-
+
if hashtags :
def merge_hash(l,h):
l.extend(h.split(","))
return l
htags = functools.reduce(merge_hash, hashtags, [])
-
+
query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable
-
+
return query
-
-
+
+
def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
-
+
query = session.query(Tweet)
- query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist)
+ query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist)
return query.order_by(Tweet.created_at)
-
+
def get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table):
-
+
query = session.query(User).join(Tweet)
-
- query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, None)
-
+
+ query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, None)
+
return query.distinct()
logger_name = "iri.tweet"
@@ -312,7 +323,7 @@
class QueueHandler(logging.Handler):
"""
- This is a logging handler which sends events to a multiprocessing queue.
+ This is a logging handler which sends events to a multiprocessing queue.
"""
def __init__(self, queue, ignore_full):
@@ -322,7 +333,7 @@
logging.Handler.__init__(self) #@UndefinedVariable
self.queue = queue
self.ignore_full = True
-
+
def emit(self, record):
"""
Emit a record.
@@ -338,7 +349,7 @@
self.queue.put_nowait(record)
except AssertionError:
pass
- except Queue.Full:
+ except queue.Full:
if self.ignore_full:
pass
else:
@@ -366,7 +377,7 @@
if percent >= 100:
writer.write("\n")
writer.flush()
-
+
return writer
def get_unused_port():
--- a/script/virtualenv/script/res/requirement.txt Mon Jul 01 14:35:52 2019 +0200
+++ b/script/virtualenv/script/res/requirement.txt Tue Jul 02 17:41:28 2019 +0200
@@ -1,25 +1,26 @@
-astroid==2.1.0
+astroid==2.2.5
blessings==1.7
-certifi==2018.11.29
+certifi==2019.6.16
chardet==3.0.4
cssselect==1.0.3
docutils==0.14
idna==2.8
-isort==4.3.4
-lazy-object-proxy==1.3.1
+isort==4.3.21
+lazy-object-proxy==1.4.1
lockfile==0.12.2
-lxml==4.2.5
+lxml==4.3.4
mccabe==0.6.1
-oauthlib==2.1.0
-pylint==2.2.2
+oauthlib==3.0.1
+pylint==2.3.1
pyquery==1.4.0
-python-daemon==2.2.0
-python-dateutil==2.7.5
-requests==2.21.0
-requests-oauthlib==1.1.0
+python-daemon==2.2.3
+python-dateutil==2.8.0
+requests==2.22.0
+requests-oauthlib==1.2.0
six==1.12.0
-SQLAlchemy==1.2.15
+SQLAlchemy==1.3.5
twitter==1.18.0
twitter-text==3.0
-urllib3==1.24.1
-wrapt==1.10.11
+typed-ast==1.4.0
+urllib3==1.25.3
+wrapt==1.11.2