Adapt recorder_stream to python 3
Improve twitter authentication management
Use Oauth2 where possible
Delete old script
--- a/script/lib/iri_tweet/iri_tweet/processor.py Wed Jan 02 17:49:19 2019 +0100
+++ b/script/lib/iri_tweet/iri_tweet/processor.py Thu Jan 10 18:36:36 2019 +0100
@@ -4,10 +4,10 @@
@author: ymh
'''
-from iri_tweet.models import (User, EntityType, adapt_json, MediaType, Media,
- EntityMedia, Hashtag, EntityHashtag, EntityUser, EntityUrl, Url, Entity, Tweet,
+from iri_tweet.models import (User, EntityType, adapt_json, MediaType, Media,
+ EntityMedia, Hashtag, EntityHashtag, EntityUser, EntityUrl, Url, Entity, Tweet,
TweetSource, TweetLog)
-from iri_tweet.utils import (ObjectsBuffer, adapt_fields, fields_adapter,
+from iri_tweet.utils import (ObjectsBuffer, adapt_fields, fields_adapter,
ObjectBufferProxy, get_oauth_token, clean_keys)
from sqlalchemy.orm import joinedload
import logging
@@ -20,30 +20,27 @@
pass
class TwitterProcessor(object):
- def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None):
+ def __init__(self, json_dict, json_txt, source_id, session, twitter_auth=None, user_query_twitter=False, logger=None):
if json_dict is None and json_txt is None:
raise TwitterProcessorException("No json")
-
+
if json_dict is None:
self.json_dict = json.loads(json_txt)
else:
self.json_dict = json_dict
-
+
if not json_txt:
self.json_txt = json.dumps(json_dict)
else:
self.json_txt = json_txt
-
+
if "id" not in self.json_dict:
raise TwitterProcessorException("No id in json")
-
+
self.source_id = source_id
self.session = session
- self.consumer_key = consumer_token[0] if consumer_token else None
- self.consumer_secret = consumer_token[1] if consumer_token else None
- self.token_filename = token_filename
- self.access_token = access_token
+ self.twitter_auth = twitter_auth
self.obj_buffer = ObjectsBuffer()
self.user_query_twitter = user_query_twitter
if not logger:
@@ -57,27 +54,26 @@
self.source_id = tweet_source.id
self.process_source()
self.obj_buffer.persists(self.session)
-
+
def process_source(self):
raise NotImplementedError()
-
+
def log_info(self):
- return "Process tweet %s" % repr(self.__class__)
+ return "Process tweet %s" % repr(self.__class__)
class TwitterProcessorStatus(TwitterProcessor):
-
- def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None):
- TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger)
-
+
def __get_user(self, user_dict, do_merge):
self.logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
-
+
user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
-
- user_id = user_dict.get("id",None)
+
+ user_id = user_dict.get("id", user_dict.get("id_str",None))
+ if user_id is not None:
+ user_id = int(user_id)
user_name = user_dict.get("screen_name", user_dict.get("name", None))
-
+
if user_id is None and user_name is None:
return None
@@ -87,7 +83,7 @@
else:
user = self.obj_buffer.get(User, screen_name=user_name)
- #to do update user id needed
+ #to do update user id needed
if user is not None:
user_created_at = None
if user.args is not None:
@@ -99,31 +95,26 @@
user.args.update(user_dict)
return user
- #todo : add methpds to objectbuffer to get buffer user
+ #todo : add methods to objectbuffer to get buffer user
user_obj = None
if user_id:
user_obj = self.session.query(User).filter(User.id == user_id).first()
else:
user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first()
-
+
#todo update user if needed
- if user_obj is not None:
+ if user_obj is not None:
if user_obj.created_at is not None or user_dict.get('created_at', None) is None or not do_merge :
user = ObjectBufferProxy(User, None, None, False, user_obj)
else:
user = self.obj_buffer.add_object(User, None, user_dict, True, user_obj)
return user
-
+
user_created_at = user_dict.get("created_at", None)
-
+
if user_created_at is None and self.user_query_twitter:
-
- if self.access_token is not None:
- acess_token_key, access_token_secret = self.access_token
- else:
- acess_token_key, access_token_secret = get_oauth_token(consumer_key=self.consumer_key, consumer_secret=self.consumer_secret, token_file_path=self.token_filename)
- #TODO pass it as argument
- t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, self.consumer_key, self.consumer_secret))
+
+ t = twitter.Twitter(auth=self.twitter_auth)
try:
if user_id:
user_dict = t.users.show(user_id=user_id)
@@ -133,20 +124,20 @@
self.logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
self.logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
return None
-
- if "id" not in user_dict:
+
+ if "id" not in user_dict or not user_dict['id']:
return None
-
+
#TODO filter get, wrap in proxy
user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first()
-
+
if user_obj is not None and not do_merge:
return ObjectBufferProxy(User, None, None, False, user_obj)
- else:
- return self.obj_buffer.add_object(User, None, user_dict, True)
+ else:
+ return self.obj_buffer.add_object(User, None, user_dict, True)
def __get_or_create_object(self, klass, filter_by_kwargs, filter_arg, creation_kwargs, must_flush, do_merge):
-
+
obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs)
if obj_proxy is None:
query = self.session.query(klass)
@@ -167,11 +158,11 @@
def __process_entity(self, ind, ind_type):
self.logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
-
+
ind = clean_keys(ind)
-
+
entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False)
-
+
entity_dict = {
"indice_start" : ind["indices"][0],
"indice_end" : ind["indices"][1],
@@ -181,19 +172,19 @@
}
def process_medias():
-
+
media_id = ind.get('id', None)
if media_id is None:
return None, None
-
+
type_str = ind.get("type", "photo")
media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False)
media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"])
if "type" in media_ind:
del(media_ind["type"])
- media_ind['type_id'] = media_type.id
+ media_ind['type_id'] = media_type.id
media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False)
-
+
entity_dict['media_id'] = media.id
return EntityMedia, entity_dict
@@ -204,8 +195,8 @@
ind['text'] = text
hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False)
entity_dict['hashtag_id'] = hashtag.id
- return EntityHashtag, entity_dict
-
+ return EntityHashtag, entity_dict
+
def process_user_mentions():
user_mention = self.__get_user(ind, False)
if user_mention is None:
@@ -213,71 +204,79 @@
else:
entity_dict['user_id'] = user_mention.id
return EntityUser, entity_dict
-
+
def process_urls():
url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False)
entity_dict['url_id'] = url.id
return EntityUrl, entity_dict
-
+
#{'': lambda }
- entity_klass, entity_dict = {
+ entity_klass, entity_dict = {
'hashtags': process_hashtags,
'user_mentions' : process_user_mentions,
'urls' : process_urls,
'media': process_medias,
}.get(ind_type, lambda: (Entity, entity_dict))()
-
+
self.logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
if entity_klass:
self.obj_buffer.add_object(entity_klass, None, entity_dict, False)
def __process_twitter(self):
-
+
tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
if tweet_nb > 0:
return
-
- ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
-
+
+ ts_copy = None
+ # Takes into account the extended tweets
+ if self.json_dict.get('truncated') and self.json_dict.get('extended_tweet'):
+ ts_copy = { **self.json_dict, **self.json_dict['extended_tweet'], "text": self.json_dict['extended_tweet']['full_text'] }
+ else:
+ ts_copy = { **self.json_dict }
+
+
+ ts_copy = adapt_fields(ts_copy, fields_adapter["stream"]["tweet"])
+
# get or create user
- user = self.__get_user(self.json_dict["user"], True)
+ user = self.__get_user(ts_copy["user"], True)
if user is None:
- self.logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
+ self.logger.warning("USER not found " + repr(ts_copy["user"])) #@UndefinedVariable
ts_copy["user_id"] = None
else:
ts_copy["user_id"] = user.id
-
+
del(ts_copy['user'])
ts_copy["tweet_source_id"] = self.source_id
-
+
self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True)
-
- self.__process_entities()
+
+ self.__process_entities(ts_copy)
- def __process_entities(self):
- if "entities" in self.json_dict:
- for ind_type, entity_list in self.json_dict["entities"].items():
+ def __process_entities(self, json_dict):
+ if "entities" in json_dict:
+ for ind_type, entity_list in json_dict["entities"].items():
for ind in entity_list:
self.__process_entity(ind, ind_type)
else:
-
+
text = self.tweet.text
extractor = twitter_text.Extractor(text)
for ind in extractor.extract_hashtags_with_indices():
self.__process_entity(ind, "hashtags")
-
+
for ind in extractor.extract_urls_with_indices():
self.__process_entity(ind, "urls")
-
+
for ind in extractor.extract_mentioned_screen_names_with_indices():
self.__process_entity(ind, "user_mentions")
def process_source(self):
-
+
status_id = self.json_dict["id"]
log = self.session.query(TweetLog).filter(TweetLog.status_id==status_id).first()
if(log):
@@ -307,24 +306,22 @@
}
}
"""
- def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None):
- TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger)
def process(self):
-
+
#find tweet
tweet_id = self.json_dict.get('delete',{}).get('status',{}).get('id',None)
if tweet_id:
t = self.session.query(Tweet).options(joinedload(Tweet.tweet_source)).filter(Tweet.id == tweet_id).first()
if t:
- tsource = t.tweet_source
+ tsource = t.tweet_source
self.session.delete(t)
self.session.query(TweetLog).filter(TweetLog.tweet_source_id == tsource.id).delete()
self.session.delete(tsource)
self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['DELETE']}, True)
else:
self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status_id': tweet_id,'status':TweetLog.TWEET_STATUS['DELETE_PENDING']}, True)
-
+
def log_info(self):
status_del = self.json_dict.get('delete', {}).get("status",{})
return u"Process delete for %s : %s" % (status_del.get('user_id_str',u""), status_del.get('id_str',u""))
@@ -341,10 +338,7 @@
}
"""
- def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None):
- TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger)
-
- def process_source(self):
+ def process_source(self):
up_to_status_id = self.json_dict.get("scrub_geo", {}).get("up_to_status_id", None)
if not up_to_status_id:
return
@@ -357,7 +351,7 @@
tsource_dict["geo"] = None
self.obj_buffer.add_object(TweetSource, tsource, {'original_json': json.dumps(tsource_dict)}, False)
self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['SCRUB_GEO']}, True)
-
+
def log_info(self):
return u"Process scrub geo for %s : %s" % (self.json_dict["scrub_geo"].get('user_id_str',u""), self.json_dict["scrub_geo"].get('id_str',u""))
@@ -369,19 +363,17 @@
"track":1234
}
}
- """
- def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None):
- TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger)
+ """
def process_source(self):
"""
do nothing, just log the information
- """
+ """
self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['LIMIT'], 'error':self.json_txt}, True)
-
+
def log_info(self):
return u"Process limit %d " % self.json_dict.get("limit", {}).get('track', 0)
-
+
class TwitterProcessorStatusWithheld(TwitterProcessor):
"""
{
@@ -392,31 +384,27 @@
}
}
"""
- def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None):
- TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger)
-
+
def process_source(self):
"""
do nothing, just log the information
"""
self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['STATUS_WITHHELD'], 'error':self.json_txt}, True)
-
+
def log_info(self):
status_withheld = self.json_dict.get("status_withheld",{})
return u"Process status withheld status id %d from user %d in countries %s" %(status_withheld.get("id",0), status_withheld.get("user_id",0), u",".join(status_withheld.get("withheld_in_countries",[])))
class TwitterProcessorUserWithheld(TwitterProcessor):
"""
- {
+ {
"user_withheld":{
"id":123456,
"withheld_in_countries":["DE","AR"]
}
}
"""
- def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None):
- TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger)
-
+
def process_source(self):
"""
do nothing, just log the information
@@ -438,9 +426,7 @@
}
}
"""
- def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None):
- TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger)
-
+
def process_source(self):
"""
do nothing, just log the information
@@ -461,8 +447,6 @@
}
}
"""
- def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None):
- TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger)
def process_source(self):
"""
@@ -482,7 +466,7 @@
'status_withheld': TwitterProcessorStatusWithheld,
'user_withheld': TwitterProcessorUserWithheld,
'disconnect': TwitterProcessorDisconnect,
- 'warning': TwitterProcessorStallWarning
+ 'warning': TwitterProcessorStallWarning
}
def get_processor(tweet_dict):
--- a/script/lib/iri_tweet/iri_tweet/stream.py Wed Jan 02 17:49:19 2019 +0100
+++ b/script/lib/iri_tweet/iri_tweet/stream.py Thu Jan 10 18:36:36 2019 +0100
@@ -7,62 +7,15 @@
Module directly inspired by tweetstream
'''
+import json
+import select
import time
+
import requests
from requests.utils import stream_decode_response_unicode
-import anyjson
-import select
-
-from . import USER_AGENT, ConnectionError, AuthenticationError
-
-def iter_content_non_blocking(req, max_chunk_size=4096, decode_unicode=False, timeout=-1):
-
- if req._content_consumed:
- raise RuntimeError(
- 'The content for this response was already consumed'
- )
-
- req.raw._fp.fp._sock.setblocking(False)
-
- def generate():
- chunk_size = 1
- while True:
- if timeout < 0:
- rlist,_,_ = select.select([req.raw._fp.fp._sock], [], [])
- else:
- rlist,_,_ = select.select([req.raw._fp.fp._sock], [], [], timeout)
-
- if not rlist:
- continue
-
- try:
- chunk = req.raw.read(chunk_size, decode_content=True)
- if not chunk:
- break
- if len(chunk) >= chunk_size and chunk_size < max_chunk_size:
- chunk_size = min(chunk_size*2, max_chunk_size)
- elif len(chunk) < chunk_size/2 and chunk_size < max_chunk_size:
- chunk_size = max(chunk_size/2,1)
- yield chunk
- except requests.exceptions.SSLError as e:
- if e.errno == 2:
- # Apparently this means there was nothing in the socket buf
- pass
- else:
- raise
-
- req._content_consumed = True
+from . import USER_AGENT, AuthenticationError, ConnectionError
- gen = generate()
-
- if decode_unicode:
- gen = stream_decode_response_unicode(gen, req)
-
- return gen
-
-
-
class BaseStream(object):
@@ -121,7 +74,7 @@
"""
def __init__(self, auth,
- raw=False, timeout=-1, url=None, compressed=False, chunk_size=4096, logger=None):
+ raw=False, timeout=-1, url=None, compressed=False, logger=None):
self._conn = None
self._rate_ts = None
self._rate_cnt = 0
@@ -136,14 +89,13 @@
self.count = 0
self.rate = 0
self.user_agent = USER_AGENT
- self.chunk_size = chunk_size
if url: self.url = url
-
+
self.muststop = False
self._logger = logger
-
+
self._iter = self.__iter__()
-
+
def __enter__(self):
return self
@@ -154,24 +106,24 @@
def _init_conn(self):
"""Open the connection to the twitter server"""
-
+
if self._logger : self._logger.debug("BaseStream Open the connection to the twitter server")
-
+
headers = {'User-Agent': self.user_agent}
-
+
if self._compressed:
headers['Accept-Encoding'] = "deflate, gzip"
postdata = self._get_post_data() or {}
postdata['stall_warnings'] = 'true'
-
+
if self._logger : self._logger.debug("BaseStream init connection url " + repr(self.url))
if self._logger : self._logger.debug("BaseStream init connection headers " + repr(headers))
if self._logger : self._logger.debug("BaseStream init connection data " + repr(postdata))
-
+
self._resp = requests.post(self.url, auth=self._auth, headers=headers, data=postdata, stream=True)
if self._logger : self._logger.debug("BaseStream init connection " + repr(self._resp))
-
+
self._resp.raise_for_status()
self.connected = True
@@ -189,110 +141,59 @@
def testmuststop(self):
if callable(self.muststop):
- return self.muststop()
+ return self.muststop() # pylint: disable=not-callable
else:
return self.muststop
-
+
def _update_rate(self):
rate_time = time.time() - self._rate_ts
if not self._rate_ts or rate_time > self.rate_period:
self.rate = self._rate_cnt / rate_time
self._rate_cnt = 0
self._rate_ts = time.time()
-
- def _iter_object(self):
-
-# for line in self._resp.iter_lines():
-# yield line
-# pending = None
-#
-# for chunk in self._resp.iter_content(chunk_size=self.chunk_size, decode_unicode=None):
-#
-# if pending is not None:
-# chunk = pending + chunk
-# lines = chunk.splitlines()
-#
-# if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
-# pending = lines.pop()
-# else:
-# pending = None
-#
-# for line in lines:
-# yield line
-#
-# if pending is not None:
-# yield pending
-
- pending = None
- has_stopped = False
-
- if self._logger : self._logger.debug("BaseStream _iter_object")
-
- for chunk in self._resp.iter_content(
- chunk_size=self.chunk_size,
- decode_unicode=True):
-
- if self._logger : self._logger.debug("BaseStream _iter_object loop")
- if self.testmuststop():
- has_stopped = True
- break
-
- if pending is not None:
- chunk = pending + chunk
- lines = chunk.split('\r')
-
- if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
- pending = lines.pop()
- else:
- pending = None
-
- for line in lines:
- yield line.strip('\n')
-
- if self.testmuststop():
- has_stopped = True
- break
-
- if pending is not None:
- yield pending
- if has_stopped:
- raise StopIteration()
-
+
def __iter__(self):
-
+
if self._logger : self._logger.debug("BaseStream __iter__")
if not self.connected:
if self._logger : self._logger.debug("BaseStream __iter__ not connected, connecting")
self._init_conn()
if self._logger : self._logger.debug("BaseStream __iter__ connected")
-
- for line in self._iter_object():
+ has_stopped = False
- if self._logger : self._logger.debug("BaseStream __iter__ line %s " % repr(line))
-
+ # for line in self._iter_object():
+ for line in self._resp.iter_lines():
+
if not line:
continue
+ if self.testmuststop():
+ has_stopped = True
+ break
+
+
+ if self._logger : self._logger.debug("BaseStream __iter__ line %s " % repr(line))
+
+ text_in_tweet = False
if (self.raw_mode):
tweet = line
+ text_in_tweet = b'text' in tweet
else:
- line = line.decode("utf8")
try:
- tweet = anyjson.deserialize(line)
+ tweet = json.loads(line)
except ValueError:
self.close()
raise ConnectionError("Got invalid data from twitter", details=line)
- if 'text' in tweet:
+ text_in_tweet = 'text' in tweet
+ if text_in_tweet:
self.count += 1
self._rate_cnt += 1
self._update_rate()
yield tweet
-
- def next(self):
- """Return the next available tweet. This call is blocking!"""
- return self._iter.next()
+ if has_stopped:
+ raise StopIteration()
def close(self):
@@ -306,12 +207,12 @@
url = "https://stream.twitter.com/1.1/statuses/filter.json"
def __init__(self, auth, follow=None, locations=None,
- track=None, url=None, raw=False, timeout=None, compressed=False, chunk_size=requests.models.ITER_CHUNK_SIZE, logger=None):
+ track=None, url=None, raw=False, timeout=None, compressed=False, logger=None):
self._follow = follow
self._locations = locations
self._track = track
# remove follow, locations, track
- BaseStream.__init__(self, auth, url=url, raw=raw, timeout=timeout, compressed=compressed, chunk_size=chunk_size, logger=logger)
+ BaseStream.__init__(self, auth, url=url, raw=raw, timeout=timeout, compressed=compressed, logger=logger)
def _get_post_data(self):
postdata = {}
@@ -322,7 +223,7 @@
class SafeStreamWrapper(object):
-
+
def __init__(self, base_stream, logger=None, error_cb=None, max_reconnects=-1, initial_tcp_wait=250, initial_http_wait=5000, max_wait=240000):
self._stream = base_stream
self._logger = logger
@@ -342,12 +243,12 @@
self._error_cb(e)
if self._logger: self._logger.info("stream sleeping for %d ms " % self._retry_wait)
time.sleep(float(self._retry_wait)/1000.0)
-
-
+
+
def __process_tcp_error(self,e):
if self._logger: self._logger.debug("connection error type :" + repr(type(e)))
if self._logger: self._logger.debug("connection error :" + repr(e))
-
+
self._reconnects += 1
if self._max_reconnects >= 0 and self._reconnects > self._max_reconnects:
raise ConnectionError("Too many retries")
@@ -355,10 +256,10 @@
self._retry_wait += self._initial_tcp_wait
if self._retry_wait > self._max_wait:
self._retry_wait = self._max_wait
-
+
self.__post_process_error(e)
-
+
def __process_http_error(self,e):
if self._logger: self._logger.debug("http error type %s" % (repr(type(e))))
if self._logger: self._logger.debug("http error on %s : %s" % (e.response.url,e.message))
@@ -367,9 +268,9 @@
self._retry_wait = 2*self._retry_wait if self._retry_wait > 0 else self._initial_http_wait
if self._retry_wait > self._max_wait:
self._retry_wait = self._max_wait
-
+
self.__post_process_error(e)
-
+
def __iter__(self):
while not self._stream.testmuststop():
self._retry_nb += 1
@@ -385,15 +286,11 @@
yield tweet
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
- if self._logger: self._logger.debug("SafeStreamWrapper Connection Error http error on %s : %s" % (e.response.url,e.message))
- raise AuthenticationError("Error connecting to %s : %s : %s - %s" % (e.response.url,e.message, repr(e.response.headers),repr(e.response.text)))
+ if self._logger: self._logger.debug("SafeStreamWrapper Connection Error http error on %s : %s" % (e.response.url,e.strerror))
+ raise AuthenticationError("Error connecting to %s : %s : %s - %s" % (e.response.url,e.strerror, repr(e.response.headers),repr(e.response.text)))
if e.response.status_code > 200:
self.__process_http_error(e)
else:
self.__process_tcp_error(e)
except (ConnectionError, requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.RequestException) as e:
self.__process_tcp_error(e)
-
-
-
-
\ No newline at end of file
--- a/script/lib/iri_tweet/iri_tweet/utils.py Wed Jan 02 17:49:19 2019 +0100
+++ b/script/lib/iri_tweet/iri_tweet/utils.py Thu Jan 10 18:36:36 2019 +0100
@@ -57,6 +57,46 @@
get_logger().debug("get_oauth_token : done got %s" % repr(res))
return res
+
+def get_oauth2_token(consumer_key, consumer_secret, token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME):
+
+ global CACHE_ACCESS_TOKEN
+
+ if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
+ return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
+
+ res = CACHE_ACCESS_TOKEN.get(application_name, None)
+
+ if res is None and token_file_path and os.path.exists(token_file_path):
+ get_logger().debug("get_oauth2_token : reading token from file %s" % token_file_path) #@UndefinedVariable
+ res = twitter.oauth2.read_bearer_token_file(token_file_path)
+
+ if res is not None and check_access_token:
+ get_logger().debug("get_oauth2_token : Check oauth tokens") #@UndefinedVariable
+ t = twitter.Twitter(auth=twitter.OAuth2(consumer_key, consumer_secret, res))
+ status = None
+ try:
+ status = t.application.rate_limit_status()
+ except Exception as e:
+ get_logger().debug("get_oauth2_token : error getting rate limit status %s " % repr(e))
+ get_logger().debug("get_oauth2_token : error getting rate limit status %s " % str(e))
+ status = None
+ get_logger().debug("get_oauth2_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable
+ if status is None or status.get("resources",{}).get("account",{}).get('/account/verify_credentials',{}).get('remaining',0) == 0:
+ get_logger().debug("get_oauth2_token : Problem with status %s" % repr(status))
+ res = None
+
+ if res is None:
+ get_logger().debug("get_oauth2_token : doing the oauth dance")
+ res = twitter.oauth2_dance(consumer_key, consumer_secret, token_file_path)
+
+
+ CACHE_ACCESS_TOKEN[application_name] = res
+
+ get_logger().debug("get_oauth_token : done got %s" % repr(res))
+ return res
+
+
def parse_date(date_str):
ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable
return datetime.datetime(*ts[0:7])
@@ -116,12 +156,10 @@
def persists(self, session):
new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
- new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.iteritems()]) if self.kwargs is not None else {}
+ new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {}
- if self.instance is None:
- self.instance = self.klass(*new_args, **new_kwargs)
- else:
- self.instance = self.klass(*new_args, **new_kwargs)
+ self.instance = self.klass(*new_args, **new_kwargs)
+ if self.instance is not None:
self.instance = session.merge(self.instance)
session.add(self.instance)
--- a/script/stream/recorder_stream.py Wed Jan 02 17:49:19 2019 +0100
+++ b/script/stream/recorder_stream.py Thu Jan 10 18:36:36 2019 +0100
@@ -1,33 +1,35 @@
-from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
-from iri_tweet import models, utils
-from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
-from iri_tweet.processor import get_processor
-from multiprocessing import Queue as mQueue, Process, Event
-from sqlalchemy.exc import OperationalError
-from sqlalchemy.orm import scoped_session
-import Queue
-import StringIO
-import anyjson
import argparse
import datetime
import inspect
-import iri_tweet.stream
+import json
import logging
import os
+import queue
import re
-import requests_oauthlib
import shutil
import signal
import socket
-import sqlalchemy.schema
import sys
-import thread
import threading
import time
import traceback
-import urllib2
-socket._fileobject.default_bufsize = 0
+import urllib
+from http.server import BaseHTTPRequestHandler, HTTPServer
+from io import StringIO
+from multiprocessing import Event, Process
+from multiprocessing import Queue as mQueue
+import requests_oauthlib
+import sqlalchemy.schema
+import twitter
+from sqlalchemy.exc import OperationalError
+from sqlalchemy.orm import scoped_session
+
+import _thread
+import iri_tweet.stream
+from iri_tweet import models, utils
+from iri_tweet.models import ProcessEvent, TweetLog, TweetSource
+from iri_tweet.processor import get_processor
# columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
@@ -40,20 +42,17 @@
class Requesthandler(BaseHTTPRequestHandler):
- def __init__(self, request, client_address, server):
- BaseHTTPRequestHandler.__init__(self, request, client_address, server)
-
def do_GET(self):
self.send_response(200)
self.end_headers()
-
+
def log_message(self, format, *args): # @ReservedAssignment
pass
def set_logging(options):
loggers = []
-
+
loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing')))
if options.debug >= 2:
@@ -68,17 +67,14 @@
qlogger.propagate = 0
return qlogger
-def get_auth(options, access_token):
- consumer_key = options.consumer_key
- consumer_secret = options.consumer_secret
- auth = requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=access_token[0], resource_owner_secret=access_token[1], signature_type='auth_header')
- return auth
+def get_auth(consumer_key, consumer_secret, token_key, token_secret):
+ return requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=token_key, resource_owner_secret=token_secret, signature_type='auth_header')
def add_process_event(event_type, args, session_maker):
session = session_maker()
try:
- evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=event_type)
+ evt = ProcessEvent(args=None if args is None else json.dumps(args), type=event_type)
session.add(evt)
session.commit()
finally:
@@ -87,15 +83,14 @@
class BaseProcess(Process):
- def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
+ def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid):
self.parent_pid = parent_pid
self.session_maker = session_maker
self.queue = queue
self.options = options
self.logger_queue = logger_queue
self.stop_event = stop_event
- self.consumer_token = (options.consumer_key, options.consumer_secret)
- self.access_token = access_token
+ self.twitter_auth = twitter_auth
super(BaseProcess, self).__init__()
@@ -112,10 +107,10 @@
else:
# *ring* Hi mom!
return True
-
+
def __get_process_event_args(self):
- return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
+ return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__}
def run(self):
try:
@@ -123,47 +118,45 @@
self.do_run()
finally:
add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
-
+
def do_run(self):
raise NotImplementedError()
class SourceProcess(BaseProcess):
-
- def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
+
+ def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid):
self.track = options.track
- self.token_filename = options.token_filename
self.timeout = options.timeout
self.stream = None
- super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
-
+ super(SourceProcess, self).__init__(session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid)
+
def __source_stream_iter(self):
-
+
self.logger.debug("SourceProcess : run ")
-
- self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.access_token))
- self.auth = get_auth(self.options, self.access_token)
+
+ self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.twitter_auth))
+ self.auth = get_auth(self.twitter_auth.consumer_key, self.twitter_auth.consumer_secret, self.twitter_auth.token, self.twitter_auth.token_secret)
self.logger.debug("SourceProcess : auth set ")
-
track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
self.logger.debug("SourceProcess : track list " + track_list)
-
+
track_list = [k.strip() for k in track_list.split(',')]
- self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth)))
- self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, chunk_size=512, logger=self.logger)
+ self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth)))
+ self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, logger=self.logger)
self.logger.debug("SourceProcess : after connecting to stream")
- self.stream.muststop = lambda: self.stop_event.is_set()
-
+ self.stream.muststop = lambda: self.stop_event.is_set()
+
stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger)
-
+
session = self.session_maker()
-
+
#import pydevd
#pydevd.settrace(suspend=False)
-
+
try:
for tweet in stream_wrapper:
if not self.parent_is_alive():
@@ -184,7 +177,7 @@
self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
if add_retries == 10:
raise
-
+
source_id = source.id
self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
@@ -204,13 +197,13 @@
def do_run(self):
-
- self.logger = set_logging_process(self.options, self.logger_queue)
-
+
+ self.logger = set_logging_process(self.options, self.logger_queue)
+
source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
-
+
source_stream_iter_thread.start()
-
+
try:
while not self.stop_event.is_set():
self.logger.debug("SourceProcess : In while after start")
@@ -230,11 +223,11 @@
source_stream_iter_thread.join(30)
-def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger):
+def process_tweet(tweet, source_id, session, twitter_auth, twitter_query_user, logger):
try:
if not tweet.strip():
return
- tweet_obj = anyjson.deserialize(tweet)
+ tweet_obj = json.loads(tweet)
processor_klass = get_processor(tweet_obj)
if not processor_klass:
tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
@@ -244,18 +237,16 @@
json_txt=tweet,
source_id=source_id,
session=session,
- consumer_token=consumer_token,
- access_token=access_token,
- token_filename=token_filename,
+ twitter_auth=twitter_auth,
user_query_twitter=twitter_query_user,
logger=logger)
- logger.info(processor.log_info())
- logger.debug(u"Process_tweet :" + repr(tweet))
+ logger.info(processor.log_info())
+ logger.debug(u"Process_tweet :" + repr(tweet))
processor.process()
-
+
except ValueError as e:
message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
- output = StringIO.StringIO()
+ output = StringIO()
try:
traceback.print_exc(file=output)
error_stack = output.getvalue()
@@ -263,11 +254,11 @@
output.close()
tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
session.add(tweet_log)
- session.commit()
+ session.commit()
except Exception as e:
message = u"Error %s processing tweet %s" % (repr(e), tweet)
logger.exception(message)
- output = StringIO.StringIO()
+ output = StringIO()
try:
traceback.print_exc(file=output)
error_stack = output.getvalue()
@@ -278,17 +269,17 @@
session.add(tweet_log)
session.commit()
-
-
+
+
class TweetProcess(BaseProcess):
-
- def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
- super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
+
+ def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid):
+ super(TweetProcess, self).__init__(session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid)
self.twitter_query_user = options.twitter_query_user
def do_run(self):
-
+
self.logger = set_logging_process(self.options, self.logger_queue)
session = self.session_maker()
try:
@@ -299,7 +290,7 @@
except Exception as e:
self.logger.debug('Process tweet exception in loop : ' + repr(e))
continue
- process_tweet(tweet_txt, source_id, session, self.consumer_token, self.access_token, self.twitter_query_user, self.options.token_filename, self.logger)
+ process_tweet(tweet_txt, source_id, session, self.twitter_auth, self.twitter_query_user, self.logger)
session.commit()
except KeyboardInterrupt:
self.stop_event.set()
@@ -313,36 +304,36 @@
Session = scoped_session(Session)
return Session, engine, metadata
-
-def process_leftovers(session, consumer_token, access_token, twitter_query_user, token_filename, ask_process_leftovers, logger):
-
+
+def process_leftovers(session, twitter_auth, twitter_query_user, ask_process_leftovers, logger):
+
sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
sources_count = sources.count()
-
+
if sources_count > 10 and ask_process_leftovers:
- resp = raw_input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count)
+ resp = input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count)
if resp and resp.strip().lower() == "n":
return
logger.info("Process leftovers, %d tweets to process" % (sources_count))
for src in sources:
tweet_txt = src.original_json
- process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger)
+ process_tweet(tweet_txt, src.id, session, twitter_auth, twitter_query_user, logger)
session.commit()
-
-
+
+
def process_log(logger_queues, stop_event):
while not stop_event.is_set():
for lqueue in logger_queues:
try:
record = lqueue.get_nowait()
logging.getLogger(record.name).handle(record)
- except Queue.Empty:
+ except queue.Empty:
continue
except IOError:
continue
time.sleep(0.1)
-
+
def get_options():
usage = "usage: %(prog)s [options]"
@@ -385,59 +376,59 @@
def do_run(options, session_maker):
stop_args = {}
-
- consumer_token = (options.consumer_key, options.consumer_secret)
- access_token = utils.get_oauth_token(consumer_key=consumer_token[0], consumer_secret=consumer_token[1], token_file_path=options.token_filename)
-
-
+
+
+ access_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename)
+ twitter_auth = twitter.OAuth(access_token_key, access_token_secret, options.consumer_key, options.consumer_secret)
+
session = session_maker()
try:
- process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
+ process_leftovers(session, twitter_auth, options.twitter_query_user, options.ask_process_leftovers, utils.get_logger())
session.commit()
finally:
session.rollback()
session.close()
-
+
if options.process_nb <= 0:
utils.get_logger().debug("Leftovers processed. Exiting.")
return None
queue = mQueue()
stop_event = Event()
-
+
# workaround for bug on using urllib2 and multiprocessing
httpd = HTTPServer(('127.0.0.1',0), Requesthandler)
- thread.start_new_thread(httpd.handle_request, ())
-
- req = urllib2.Request('http://localhost:%d' % httpd.server_port)
+ _thread.start_new_thread(httpd.handle_request, ())
+
+ req = urllib.request.Request('http://localhost:%d' % httpd.server_port)
conn = None
try:
- conn = urllib2.urlopen(req)
+ conn = urllib.request.urlopen(req)
except:
utils.get_logger().debug("could not open localhost")
# donothing
finally:
if conn is not None:
conn.close()
-
+
process_engines = []
logger_queues = []
-
+
SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
process_engines.append(engine_process)
lqueue = mQueue(50)
logger_queues.append(lqueue)
pid = os.getpid()
- sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
-
+ sprocess = SourceProcess(SessionProcess, queue, options, twitter_auth, stop_event, lqueue, pid)
+
tweet_processes = []
-
+
for i in range(options.process_nb - 1):
SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
process_engines.append(engine_process)
lqueue = mQueue(50)
logger_queues.append(lqueue)
- cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
+ cprocess = TweetProcess(SessionProcess, queue, options, twitter_auth, stop_event, lqueue, pid)
tweet_processes.append(cprocess)
log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
@@ -452,18 +443,18 @@
add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
if options.duration >= 0:
- end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
+ end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
def interupt_handler(signum, frame):
utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9)))
stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
stop_event.set()
-
+
signal.signal(signal.SIGINT , interupt_handler)
signal.signal(signal.SIGHUP , interupt_handler)
signal.signal(signal.SIGALRM, interupt_handler)
signal.signal(signal.SIGTERM, interupt_handler)
-
+
while not stop_event.is_set():
if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts:
@@ -484,7 +475,7 @@
utils.get_logger().debug("Pb joining Source Process - terminating")
finally:
sprocess.terminate()
-
+
for i, cprocess in enumerate(tweet_processes):
utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
try:
@@ -493,7 +484,7 @@
utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
cprocess.terminate()
-
+
utils.get_logger().debug("Close queues")
try:
queue.close()
@@ -502,13 +493,13 @@
except Exception as e:
utils.get_logger().error("error when closing queues %s", repr(e))
# do nothing
-
-
+
+
if options.process_nb > 1:
utils.get_logger().debug("Processing leftovers")
session = session_maker()
try:
- process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
+ process_leftovers(session, twitter_auth, options.twitter_query_user, options.ask_process_leftovers, utils.get_logger())
session.commit()
finally:
session.rollback()
@@ -516,18 +507,18 @@
for pengine in process_engines:
pengine.dispose()
-
+
return stop_args
def main(options):
-
+
global conn_str
-
+
conn_str = options.conn_str.strip()
- if not re.match("^\w+://.+", conn_str):
+ if not re.match(r"^\w+://.+", conn_str):
conn_str = 'sqlite:///' + options.conn_str
-
+
if conn_str.startswith("sqlite") and options.new:
filepath = conn_str[conn_str.find(":///") + 4:]
if os.path.exists(filepath):
@@ -543,7 +534,7 @@
shutil.move(filepath, new_path)
Session, engine, metadata = get_sessionmaker(conn_str)
-
+
if options.new:
check_metadata = sqlalchemy.schema.MetaData(bind=engine)
check_metadata.reflect()
@@ -551,28 +542,28 @@
message = "Database %s not empty exiting" % conn_str
utils.get_logger().error(message)
sys.exit(message)
-
+
metadata.create_all(engine)
session = Session()
try:
models.add_model_version(session)
finally:
session.close()
-
+
stop_args = {}
try:
add_process_event(event_type="start", args={'options':options.__dict__, 'args': [], 'command_line': sys.argv}, session_maker=Session)
stop_args = do_run(options, Session)
except Exception as e:
- utils.get_logger().exception("Error in main thread")
- outfile = StringIO.StringIO()
+ utils.get_logger().exception("Error in main thread")
+ outfile = StringIO()
try:
traceback.print_exc(file=outfile)
stop_args = {'error': repr(e), 'message': repr(e), 'stacktrace':outfile.getvalue()}
finally:
outfile.close()
raise
- finally:
+ finally:
add_process_event(event_type="shutdown", args=stop_args, session_maker=Session)
utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
@@ -582,22 +573,21 @@
if __name__ == '__main__':
options = get_options()
-
+
loggers = set_logging(options)
-
+
utils.get_logger().debug("OPTIONS : " + repr(options))
-
+
if options.daemon:
options.ask_process_leftovers = False
import daemon
-
+
hdlr_preserve = []
for logger in loggers:
hdlr_preserve.extend([h.stream for h in logger.handlers])
-
- context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve)
+
+ context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve)
with context:
main(options)
else:
main(options)
-
--- a/script/utils/merge_tweets.py Wed Jan 02 17:49:19 2019 +0100
+++ b/script/utils/merge_tweets.py Thu Jan 10 18:36:36 2019 +0100
@@ -5,6 +5,7 @@
import logging
import re
import sys
+import twitter
from iri_tweet.models import Tweet, TweetLog, TweetSource, setup_database
from iri_tweet.processor import TwitterProcessorStatus
@@ -13,7 +14,7 @@
logger = logging.getLogger(__name__)
def get_option():
-
+
parser = argparse.ArgumentParser(description='Merge tweets databases')
parser.add_argument("-l", "--log", dest="logfile",
@@ -31,23 +32,24 @@
parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
help="Token file name")
-
+
parser.add_argument("source", action="store", nargs=1, type=str, metavar="SOURCE")
parser.add_argument("target", action="store", nargs=1, type=str, metavar="TARGET")
-
+
return parser.parse_args()
if __name__ == "__main__":
-
+
#sys.stdout = codecs.getwriter(sys.stdout.encoding)(sys.stdout)
writer = None
options = get_option()
-
- access_token = None
+
+ twitter_auth = None
if options.query_user:
- access_token = get_oauth_token(options.consumer_key, options.consumer_secret, options.token_filename)
-
+ acess_token_key, access_token_secret = get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename)
+ twitter_auth = twitter.OAuth(acess_token_key, access_token_secret, options.consumer_key, options.consumer_secret)
+
#open source
src_conn_str = options.source[0].strip()
if not re.match(r"^\w+://.+", src_conn_str):
@@ -58,51 +60,51 @@
engine_src, metadata_src, Session_src = setup_database(src_conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)
- engine_tgt, metadata_tgt, Session_tgt = setup_database(tgt_conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)
+ engine_tgt, metadata_tgt, Session_tgt = setup_database(tgt_conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)
conn_src = conn_tgt = session_src = session_tgt = None
-
+
try:
#conn_src = engine_src.connect()
#conn_tgt = engine_tgt.connect()
session_src = Session_src()
session_tgt = Session_tgt()
-
+
count_tw = session_src.query(Tweet).count()
-
+
if count_tw == 0:
print("No tweet to process : exit")
sys.exit()
-
+
query_src = session_src.query(Tweet).join(TweetSource).yield_per(100)
added = 0
-
+
for i,tweet in enumerate(query_src):
-
+
tweet_count = session_tgt.query(Tweet).filter(Tweet.id == tweet.id).count()
-
+
progress_text = u"Process: "
if tweet_count == 0:
added += 1
progress_text = u"Adding : "
tweet_source = tweet.tweet_source.original_json
-
+
tweet_obj = json.loads(tweet_source)
if 'text' not in tweet_obj:
tweet_log = TweetLog(tweet_source_id=tweet.tweet_source.id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
session_tgt.add(tweet_log)
- else:
- tp = TwitterProcessorStatus(None, tweet_source, None, session_tgt, access_token, options.token_filename, user_query_twitter=options.query_user, logger=logger)
+ else:
+ tp = TwitterProcessorStatus(None, tweet_source, None, session_tgt, twitter_auth=twitter_auth, user_query_twitter=options.query_user, logger=logger)
tp.process()
-
+
session_tgt.flush()
-
+
ptext = progress_text + tweet.text
writer = show_progress(i+1, count_tw, ptext.replace("\n",""), 70, writer)
-
+
session_tgt.commit()
print(u"%d new tweet added" % (added,))
-
+
finally:
if session_tgt is not None:
session_tgt.close()
--- a/script/utils/search_topsy.py Wed Jan 02 17:49:19 2019 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,181 +0,0 @@
-import argparse
-import logging
-import math
-import re
-import time
-
-from blessings import Terminal
-import requests
-import twitter
-
-from iri_tweet import models, utils
-from iri_tweet.processor import TwitterProcessorStatus
-
-
-logger = logging.getLogger(__name__)
-
-APPLICATION_NAME = "Tweet recorder user"
-
-
-class TopsyResource(object):
-
- def __init__(self, query, **kwargs):
-
- self.options = kwargs
- self.options['q'] = query
- self.url = kwargs.get("url", "http://otter.topsy.com/search.json")
- self.page = 0
- self.req = None
- self.res = {}
-
- def __initialize(self):
-
- params = {}
- params.update(self.options)
- self.req = requests.get(self.url, params=params)
- self.res = self.req.json()
-
- def __next_page(self):
- page = self.res.get("response").get("page") + 1
- params = {}
- params.update(self.options)
- params['page'] = page
- self.req = requests.get(self.url, params=params)
- self.res = self.req.json()
-
- def __iter__(self):
- if not self.req:
- self.__initialize()
- while "response" in self.res and "list" in self.res.get("response") and self.res.get("response").get("list"):
- for item in self.res.get("response").get("list"):
- yield item
- self.__next_page()
-
- def total(self):
- if not self.res:
- return 0
- else:
- return self.res.get("response",{}).get("total",0)
-
-
-
-def get_options():
-
- usage = "usage: %(prog)s [options] <connection_str_or_filepath>"
-
- parser = argparse.ArgumentParser(usage=usage)
-
- parser.add_argument(dest="conn_str",
- help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR")
- parser.add_argument("-Q", dest="query",
- help="query", metavar="QUERY")
- parser.add_argument("-k", "--key", dest="consumer_key",
- help="Twitter consumer key", metavar="CONSUMER_KEY")
- parser.add_argument("-s", "--secret", dest="consumer_secret",
- help="Twitter consumer secret", metavar="CONSUMER_SECRET")
- parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
- help="Token file name")
- parser.add_argument("-T", dest="topsy_apikey", metavar="TOPSY_APIKEY", default=None,
- help="Topsy apikey")
-
- utils.set_logging_options(parser)
-
- return parser.parse_args()
-
-
-
-if __name__ == "__main__":
-
- options = get_options()
-
- utils.set_logging(options);
-
-
- acess_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename, application_name=APPLICATION_NAME)
-
- t = twitter.Twitter(domain="api.twitter.com", auth=twitter.OAuth(acess_token_key, access_token_secret, options.consumer_key, options.consumer_secret), secure=True)
- t.secure = True
-
- conn_str = options.conn_str.strip()
- if not re.match("^\w+://.+", conn_str):
- conn_str = 'sqlite:///' + conn_str
-
- engine, metadata, Session = models.setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all=True)
- session = None
-
-
- topsy_parameters = {
- 'apikey': options.topsy_apikey,
- 'perpage': 100,
- 'window': 'a',
- 'type': 'tweet',
- 'hidden': True,
- }
-
- term = Terminal()
-
- try:
- session = Session()
-
- results = None
- page = 1
- print options.query
-
- tr = TopsyResource(options.query, **topsy_parameters)
-
- move_up = 0
-
- for i,item in enumerate(tr):
- # get id
- url = item.get("url")
- tweet_id = url.split("/")[-1]
-
- if move_up > 0:
- print((move_up+1)*term.move_up())
- move_up = 0
-
- print ("%d/%d:%03d%% - %s - %r" % (i+1, tr.total(), int(float(i+1)/float(tr.total())*100.0), tweet_id, item.get("content") ) + term.clear_eol())
- move_up += 1
-
- count_tweet = session.query(models.Tweet).filter_by(id_str=tweet_id).count()
-
- if count_tweet:
- continue
- try:
- tweet = t.statuses.show(id=tweet_id, include_entities=True)
- except twitter.api.TwitterHTTPError as e:
- if e.e.code == 404 or e.e.code == 403:
- continue
- else:
- raise
-
- processor = TwitterProcessorStatus(tweet, None, None, session, None, options.token_filename, logger)
- processor.process()
- session.flush()
- session.commit()
-
- print("rate limit remaining %s of %s" % (str(tweet.rate_limit_remaining), str(tweet.headers.getheader('X-Rate-Limit-Limit'))) + term.clear_eol())
- move_up += 1
- rate_limit_limit = int(tweet.headers.getheader('X-Rate-Limit-Limit'))
- rate_limit_remaining = int(tweet.rate_limit_remaining)
-
- if rate_limit_remaining < rate_limit_limit:
- time_to_sleep = 0
- else:
- time_to_sleep = int(math.ceil((tweet.rate_limit_reset - time.mktime(time.gmtime())) / tweet.rate_limit_remaining))
-
- for i in xrange(time_to_sleep):
- if i:
- print(2*term.move_up())
- else:
- move_up += 1
- print(("Sleeping for %d seconds, %d remaining" % (time_to_sleep, time_to_sleep-i)) + term.clear_eol())
- time.sleep(1)
-
- except twitter.api.TwitterHTTPError as e:
- fmt = ("." + e.format) if e.format else ""
- print "Twitter sent status %s for URL: %s%s using parameters: (%s)\ndetails: %s" % (repr(e.e.code), repr(e.uri), repr(fmt), repr(e.uriparts), repr(e.response_data))
-
- finally:
- if session:
- session.close()
--- a/script/utils/search_topsy_scrap.py Wed Jan 02 17:49:19 2019 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,211 +0,0 @@
-import argparse
-import logging
-import math
-import re
-import time
-import urllib
-
-from blessings import Terminal
-import requests
-import twitter
-
-from iri_tweet import models, utils
-from iri_tweet.processor import TwitterProcessorStatus
-
-from selenium import webdriver
-from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
-from selenium.webdriver.common.by import By
-from selenium.webdriver.support.ui import WebDriverWait
-from selenium.webdriver.support import expected_conditions as EC
-
-from lxml import html
-import json
-
-logger = logging.getLogger(__name__)
-
-APPLICATION_NAME = "Tweet recorder user"
-
-dcap = dict(DesiredCapabilities.PHANTOMJS)
-dcap["phantomjs.page.settings.userAgent"] = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.103 Safari/537.36"
-
-class TopsyResource(object):
-
- def __init__(self, query, **kwargs):
-
- self.options = {}
- self.options['q'] = query
- self.options.update(kwargs)
- self.base_url = "http://topsy.com/s"
- self.driver = webdriver.PhantomJS(desired_capabilities=dcap)
- self.driver.set_window_size(1024, 768)
- self.page = -1
- self.tree = None
-
-
- def __do_request(self, params):
- url = "%s?%s" % (self.base_url, urllib.urlencode(params).replace('+','%20')) #calculate url with urllib
- print('Requesting %s' % url)
- self.driver.get(url)
- try:
- element = WebDriverWait(self.driver, 60).until(
- EC.presence_of_element_located((By.CLASS_NAME, "result-tweet"))
- )
- except Exception as e:
- print('Exception requesting %s : %s' % (url, e))
- self.tree = None
- else:
- self.tree = html.fromstring(self.driver.page_source)
-
- def __check_last(self):
- if self.page < 0:
- return False
- if self.tree is None or len(self.tree.xpath("//*[@id=\"module-pager\"]/div/ul/li[@data-page=\"next\"and @class=\"disabled\"]")):
- return True
- else:
- return False
-
-
- def __next_page(self):
- if self.__check_last():
- return False
- self.page += 1
- params = {}
- params.update(self.options)
- if self.page:
- params['offset'] = self.page*self.options.get('perpage',10)
- self.__do_request(params)
- return self.tree is not None
-
- def __iter__(self):
- result_xpath = "//*[@id=\"results\"]/div"
- while self.__next_page():
- for res_node in self.tree.xpath(result_xpath):
- res_obj = {
- 'user': "".join(res_node.xpath("./div/div/h5/a/text()")),
- 'content': "".join(res_node.xpath("./div/div/div/text()")),
- 'url': "".join(res_node.xpath("./div/div/ul/li[1]/small/a/@href"))
- }
- if res_obj['url']:
- yield res_obj
-
-
-def get_options():
-
- usage = "usage: %(prog)s [options] <connection_str_or_filepath>"
-
- parser = argparse.ArgumentParser(usage=usage)
-
- parser.add_argument(dest="conn_str",
- help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR")
- parser.add_argument("-Q", dest="query",
- help="query", metavar="QUERY")
- parser.add_argument("-k", "--key", dest="consumer_key",
- help="Twitter consumer key", metavar="CONSUMER_KEY")
- parser.add_argument("-s", "--secret", dest="consumer_secret",
- help="Twitter consumer secret", metavar="CONSUMER_SECRET")
- parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
- help="Token file name")
- parser.add_argument("-T", dest="topsy_apikey", metavar="TOPSY_APIKEY", default=None,
- help="Topsy apikey")
-
- utils.set_logging_options(parser)
-
- return parser.parse_args()
-
-
-
-if __name__ == "__main__":
-
- options = get_options()
-
- utils.set_logging(options);
-
-
- acess_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename, application_name=APPLICATION_NAME)
-
- t = twitter.Twitter(domain="api.twitter.com", auth=twitter.OAuth(acess_token_key, access_token_secret, options.consumer_key, options.consumer_secret), secure=True)
- t.secure = True
-
- conn_str = options.conn_str.strip()
- if not re.match("^\w+://.+", conn_str):
- conn_str = 'sqlite:///' + conn_str
-
- engine, metadata, Session = models.setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all=True)
- session = None
-
-
- topsy_parameters = {
- 'perpage': 10,
- 'window': 'a',
- 'type': 'tweet',
- 'hidden': 1,
- 'sort': 'date'
- }
-
- term = Terminal()
-
- try:
- session = Session()
-
- results = None
- page = 1
- print options.query
-
- tr = TopsyResource(options.query, **topsy_parameters)
-
- move_up = 0
-
- for i,item in enumerate(tr):
- # get id
- url = item.get("url")
- tweet_id = url.split("/")[-1]
-
- if move_up > 0:
- print((move_up+1)*term.move_up())
- move_up = 0
-
- print ("%d: %s - %r" % (i+1, tweet_id, item.get("content") ) + term.clear_eol())
- move_up += 1
-
- count_tweet = session.query(models.Tweet).filter_by(id_str=tweet_id).count()
-
- if count_tweet:
- continue
- try:
- tweet = t.statuses.show(id=tweet_id, include_entities=True)
- except twitter.api.TwitterHTTPError as e:
- if e.e.code == 404 or e.e.code == 403:
- continue
- else:
- raise
-
- processor = TwitterProcessorStatus(tweet, None, None, session, None, options.token_filename, logger)
- processor.process()
- session.flush()
- session.commit()
-
- print("rate limit remaining %s of %s" % (str(tweet.rate_limit_remaining), str(tweet.headers.getheader('X-Rate-Limit-Limit'))) + term.clear_eol())
- move_up += 1
- rate_limit_limit = int(tweet.headers.getheader('X-Rate-Limit-Limit'))
- rate_limit_remaining = int(tweet.rate_limit_remaining)
-
- if rate_limit_remaining < rate_limit_limit:
- time_to_sleep = 0
- else:
- time_to_sleep = int(math.ceil((tweet.rate_limit_reset - time.mktime(time.gmtime())) / tweet.rate_limit_remaining))
-
- for i in xrange(time_to_sleep):
- if i:
- print(2*term.move_up())
- else:
- move_up += 1
- print(("Sleeping for %d seconds, %d remaining" % (time_to_sleep, time_to_sleep-i)) + term.clear_eol())
- time.sleep(1)
-
- except twitter.api.TwitterHTTPError as e:
- fmt = ("." + e.format) if e.format else ""
- print "Twitter sent status %s for URL: %s%s using parameters: (%s)\ndetails: %s" % (repr(e.e.code), repr(e.uri), repr(fmt), repr(e.uriparts), repr(e.response_data))
-
- finally:
- if session:
- session.close()
--- a/script/utils/search_twitter_api.py Wed Jan 02 17:49:19 2019 +0100
+++ b/script/utils/search_twitter_api.py Thu Jan 10 18:36:36 2019 +0100
@@ -1,47 +1,91 @@
import argparse
+import datetime
+import functools
+import json
import logging
import math
import re
import time
-import datetime
import urllib
+from enum import Enum
-from blessings import Terminal
import requests
import twitter
+from blessings import Terminal
from iri_tweet import models, utils
from iri_tweet.processor import TwitterProcessorStatus
-import json
-
logger = logging.getLogger(__name__)
APPLICATION_NAME = "Tweet seach json"
+class SearchType(Enum):
+ standard = 'standard'
+ _30day = '30day'
+ full = 'full'
+
+ def __str__(self):
+ return self.value
+
+def pass_kwargs_as_json(f):
+ def kwargs_json_wrapper(*args, **kwargs):
+ normal_kwargs = { k:v for k,v in kwargs.items() if k[0] != "_" }
+ special_kwargs = { k:v for k,v in kwargs.items() if k[0] == "_" }
+ new_kwargs = { **special_kwargs, '_json': normal_kwargs }
+ return f(*args, **new_kwargs)
+ return kwargs_json_wrapper
+
# TODO: implement some more parameters
# script to "scrap twitter results"
# Shamelessly taken from https://github.com/Jefferson-Henrique/GetOldTweets-python
# pyquery cssselect
class TweetManager:
- def __init__(self, query, twitter_con):
+ def __init__(self, twitter_con, query, search_type, api_env):
self.query = query
- self.max_id = 0
+ self.search_type = search_type
+ self.next = ""
self.t = twitter_con
- pass
+ self.api_env = api_env
+ self.twitter_api = self.get_twitter_api()
+ self.rate_limit_remaining = 0
+ self.rate_limit_limit = 0
+ self.rate_limit_reset = 0
+ self.i = 0
+
+ def get_twitter_api(self):
+ return {
+ SearchType.standard: lambda t: t.search.tweets,
+ SearchType._30day: lambda t: pass_kwargs_as_json(functools.partial(getattr(getattr(t.tweets.search,'30day'),self.api_env), _method="POST")),
+ SearchType.full: lambda t: pass_kwargs_as_json(functools.partial(getattr(t.tweets.search.fullarchive, self.api_env), _method="POST")),
+ }[self.search_type](self.t)
def __iter__(self):
while True:
- if self.max_id < 0:
+ if self.next is None:
break
- json = self.get_json_response()
+ self.i = self.i+1
+
+ # with open("json_dump_%s.json" % self.i, 'r') as fp:
+ # jsondata = json.load(fp)
+ jsondata = self.get_json_response()
- next_results = json['search_metadata'].get('next_results', "?")[1:]
- self.max_id = int(urllib.parse.parse_qs(next_results).get('max_id', [-1])[0])
+ self.rate_limit_remaining = jsondata.rate_limit_remaining
+ self.rate_limit_limit = jsondata.rate_limit_limit
+ self.rate_limit_reset = jsondata.rate_limit_reset
+
+ with open("json_dump_%s.json" % self.i, 'w') as fp:
+ json.dump(jsondata, fp)
- tweet_list = json['statuses']
+ if self.search_type == SearchType.standard:
+ next_results = jsondata['search_metadata'].get('next_results', "?")[1:]
+ self.next = urllib.parse.parse_qs(next_results).get('max_id', [None])[0]
+ tweet_list = jsondata['statuses']
+ else:
+ self.next = jsondata.get('next')
+ tweet_list = jsondata['results']
if len(tweet_list) == 0:
break
@@ -50,8 +94,13 @@
yield tweet
def get_json_response(self):
- return self.t.search.tweets(q=self.query, include_entities=True, max_id=self.max_id)
-
+ if self.search_type == SearchType.standard:
+ return self.twitter_api(q=self.query, include_entities=True, max_id=int(self.next) if self.next else 0)
+ else:
+ kwargs = { "query": self.query, "maxResults": 100 }
+ if self.next:
+ kwargs["next"] = self.next
+ return self.twitter_api(**kwargs)
def get_options():
@@ -62,31 +111,37 @@
parser.add_argument(dest="conn_str",
help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR")
parser.add_argument("-Q", dest="query",
- help="query", metavar="QUERY")
+ help="query", metavar="QUERY")
parser.add_argument("-k", "--key", dest="consumer_key",
help="Twitter consumer key", metavar="CONSUMER_KEY")
parser.add_argument("-s", "--secret", dest="consumer_secret",
help="Twitter consumer secret", metavar="CONSUMER_SECRET")
parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
- help="Token file name")
+ help="Token file name")
+ parser.add_argument("-a", dest="search_type", metavar="SEARCH_TYPE", default=SearchType.standard, choices=list(SearchType), type=SearchType,
+ help="Twitter search type ('standard', '30days', 'full')")
+ parser.add_argument("-e", dest="api_env", metavar="API_ENV", default="dev",
+ help="Twitter api dev environment")
+
utils.set_logging_options(parser)
return parser.parse_args()
-
if __name__ == "__main__":
options = get_options()
+ print("the search type is : %s" % options.search_type)
+
utils.set_logging(options)
-
- acess_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename, application_name=APPLICATION_NAME)
+ bearer_token = utils.get_oauth2_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename, application_name=APPLICATION_NAME)
+ twitter_auth = twitter.OAuth2(options.consumer_key, options.consumer_secret, bearer_token)
- t = twitter.Twitter(domain="api.twitter.com", auth=twitter.OAuth(acess_token_key, access_token_secret, options.consumer_key, options.consumer_secret), secure=True)
- t.secure = True
+ t = twitter.Twitter(domain="api.twitter.com", auth=twitter_auth, secure=True)
+ t.secure = True
conn_str = options.conn_str.strip()
if not re.match(r"^\w+://.+", conn_str):
@@ -104,7 +159,7 @@
results = None
print(options.query)
- tm = TweetManager(options.query, t)
+ tm = TweetManager(t, options.query, options.search_type, options.api_env)
move_up = 0
@@ -127,7 +182,7 @@
if count_tweet:
continue
- processor = TwitterProcessorStatus(tweet, None, None, session, None, options.token_filename, logger)
+ processor = TwitterProcessorStatus(tweet, None, None, session, twitter_auth=twitter_auth, logger=logger)
processor.process()
session.flush()
session.commit()
--- a/script/utils/search_twitter_json.py Wed Jan 02 17:49:19 2019 +0100
+++ b/script/utils/search_twitter_json.py Thu Jan 10 18:36:36 2019 +0100
@@ -130,8 +130,9 @@
acess_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename, application_name=APPLICATION_NAME)
+ twitter_auth = twitter.OAuth(acess_token_key, access_token_secret, options.consumer_key, options.consumer_secret)
- t = twitter.Twitter(domain="api.twitter.com", auth=twitter.OAuth(acess_token_key, access_token_secret, options.consumer_key, options.consumer_secret), secure=True)
+ t = twitter.Twitter(domain="api.twitter.com", auth=twitter_auth, secure=True)
t.secure = True
conn_str = options.conn_str.strip()
@@ -180,7 +181,7 @@
else:
raise
- processor = TwitterProcessorStatus(tweet, None, None, session, None, options.token_filename, logger)
+ processor = TwitterProcessorStatus(tweet, None, None, session, twitter_auth=twitter_auth, logger=logger)
processor.process()
session.flush()
session.commit()
--- a/script/virtualenv/script/res/requirement.txt Wed Jan 02 17:49:19 2019 +0100
+++ b/script/virtualenv/script/res/requirement.txt Thu Jan 10 18:36:36 2019 +0100
@@ -1,23 +1,26 @@
-anyjson==0.3.3
-blessings==1.6
-cssselect==0.9.1
-docutils==0.12
-httplib2==0.9.2
+astroid==2.1.0
+blessings==1.7
+certifi==2018.11.29
+chardet==3.0.4
+cssselect==1.0.3
+docutils==0.14
+idna==2.8
iri-tweet===0.82.0final0
+isort==4.3.4
+lazy-object-proxy==1.3.1
lockfile==0.12.2
-lxml==3.5.0
-oauth2==1.9.0.post1
-oauthlib==1.0.3
-psycopg2==2.6.1
-pyquery==1.2.11
-python-daemon==2.1.1
-python-dateutil==2.5.0
-pytz==2016.1
-requests==2.9.1
-requests-oauthlib==0.6.1
-selenium==2.53.1
-simplejson==3.8.2
-six==1.10.0
-SQLAlchemy==1.0.12
-twitter==1.17.1
-twitter-text-py==2.0.2
+lxml==4.2.5
+mccabe==0.6.1
+oauthlib==2.1.0
+pylint==2.2.2
+pyquery==1.4.0
+python-daemon==2.2.0
+python-dateutil==2.7.5
+requests==2.21.0
+requests-oauthlib==1.1.0
+six==1.12.0
+SQLAlchemy==1.2.15
+twitter==1.18.0
+twitter-text==3.0
+urllib3==1.24.1
+wrapt==1.10.11
Binary file web/images/polemictweet_square.png has changed