# HG changeset patch # User ymh # Date 1547141796 -3600 # Node ID 14a9bed2e3cd075b50498ea9bade9bf90d621d1e # Parent 184372ec27e2be9730217317398a9f7063d45005 Adapt recorder_stream to python 3 Improve twitter authentication management Use Oauth2 where possible Delete old script diff -r 184372ec27e2 -r 14a9bed2e3cd script/lib/iri_tweet/iri_tweet/processor.py --- a/script/lib/iri_tweet/iri_tweet/processor.py Wed Jan 02 17:49:19 2019 +0100 +++ b/script/lib/iri_tweet/iri_tweet/processor.py Thu Jan 10 18:36:36 2019 +0100 @@ -4,10 +4,10 @@ @author: ymh ''' -from iri_tweet.models import (User, EntityType, adapt_json, MediaType, Media, - EntityMedia, Hashtag, EntityHashtag, EntityUser, EntityUrl, Url, Entity, Tweet, +from iri_tweet.models import (User, EntityType, adapt_json, MediaType, Media, + EntityMedia, Hashtag, EntityHashtag, EntityUser, EntityUrl, Url, Entity, Tweet, TweetSource, TweetLog) -from iri_tweet.utils import (ObjectsBuffer, adapt_fields, fields_adapter, +from iri_tweet.utils import (ObjectsBuffer, adapt_fields, fields_adapter, ObjectBufferProxy, get_oauth_token, clean_keys) from sqlalchemy.orm import joinedload import logging @@ -20,30 +20,27 @@ pass class TwitterProcessor(object): - def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None): + def __init__(self, json_dict, json_txt, source_id, session, twitter_auth=None, user_query_twitter=False, logger=None): if json_dict is None and json_txt is None: raise TwitterProcessorException("No json") - + if json_dict is None: self.json_dict = json.loads(json_txt) else: self.json_dict = json_dict - + if not json_txt: self.json_txt = json.dumps(json_dict) else: self.json_txt = json_txt - + if "id" not in self.json_dict: raise TwitterProcessorException("No id in json") - + self.source_id = source_id self.session = session - self.consumer_key = consumer_token[0] if consumer_token else None - self.consumer_secret = consumer_token[1] if consumer_token else None - self.token_filename = token_filename - self.access_token = access_token + self.twitter_auth = twitter_auth self.obj_buffer = ObjectsBuffer() self.user_query_twitter = user_query_twitter if not logger: @@ -57,27 +54,26 @@ self.source_id = tweet_source.id self.process_source() self.obj_buffer.persists(self.session) - + def process_source(self): raise NotImplementedError() - + def log_info(self): - return "Process tweet %s" % repr(self.__class__) + return "Process tweet %s" % repr(self.__class__) class TwitterProcessorStatus(TwitterProcessor): - - def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None): - TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger) - + def __get_user(self, user_dict, do_merge): self.logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable - + user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) - - user_id = user_dict.get("id",None) + + user_id = user_dict.get("id", user_dict.get("id_str",None)) + if user_id is not None: + user_id = int(user_id) user_name = user_dict.get("screen_name", user_dict.get("name", None)) - + if user_id is None and user_name is None: return None @@ -87,7 +83,7 @@ else: user = self.obj_buffer.get(User, screen_name=user_name) - #to do update user id needed + #to do update user id needed if user is not None: user_created_at = None if user.args is not None: @@ -99,31 +95,26 @@ user.args.update(user_dict) return user - #todo : add methpds to objectbuffer to get buffer user + #todo : add methods to objectbuffer to get buffer user user_obj = None if user_id: user_obj = self.session.query(User).filter(User.id == user_id).first() else: user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first() - + #todo update user if needed - if user_obj is not None: + if user_obj is not None: if user_obj.created_at is not None or user_dict.get('created_at', None) is None or not do_merge : user = ObjectBufferProxy(User, None, None, False, user_obj) else: user = self.obj_buffer.add_object(User, None, user_dict, True, user_obj) return user - + user_created_at = user_dict.get("created_at", None) - + if user_created_at is None and self.user_query_twitter: - - if self.access_token is not None: - acess_token_key, access_token_secret = self.access_token - else: - acess_token_key, access_token_secret = get_oauth_token(consumer_key=self.consumer_key, consumer_secret=self.consumer_secret, token_file_path=self.token_filename) - #TODO pass it as argument - t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, self.consumer_key, self.consumer_secret)) + + t = twitter.Twitter(auth=self.twitter_auth) try: if user_id: user_dict = t.users.show(user_id=user_id) @@ -133,20 +124,20 @@ self.logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable self.logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable return None - - if "id" not in user_dict: + + if "id" not in user_dict or not user_dict['id']: return None - + #TODO filter get, wrap in proxy user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first() - + if user_obj is not None and not do_merge: return ObjectBufferProxy(User, None, None, False, user_obj) - else: - return self.obj_buffer.add_object(User, None, user_dict, True) + else: + return self.obj_buffer.add_object(User, None, user_dict, True) def __get_or_create_object(self, klass, filter_by_kwargs, filter_arg, creation_kwargs, must_flush, do_merge): - + obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs) if obj_proxy is None: query = self.session.query(klass) @@ -167,11 +158,11 @@ def __process_entity(self, ind, ind_type): self.logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable - + ind = clean_keys(ind) - + entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False) - + entity_dict = { "indice_start" : ind["indices"][0], "indice_end" : ind["indices"][1], @@ -181,19 +172,19 @@ } def process_medias(): - + media_id = ind.get('id', None) if media_id is None: return None, None - + type_str = ind.get("type", "photo") media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False) media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"]) if "type" in media_ind: del(media_ind["type"]) - media_ind['type_id'] = media_type.id + media_ind['type_id'] = media_type.id media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False) - + entity_dict['media_id'] = media.id return EntityMedia, entity_dict @@ -204,8 +195,8 @@ ind['text'] = text hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False) entity_dict['hashtag_id'] = hashtag.id - return EntityHashtag, entity_dict - + return EntityHashtag, entity_dict + def process_user_mentions(): user_mention = self.__get_user(ind, False) if user_mention is None: @@ -213,71 +204,79 @@ else: entity_dict['user_id'] = user_mention.id return EntityUser, entity_dict - + def process_urls(): url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False) entity_dict['url_id'] = url.id return EntityUrl, entity_dict - + #{'': lambda } - entity_klass, entity_dict = { + entity_klass, entity_dict = { 'hashtags': process_hashtags, 'user_mentions' : process_user_mentions, 'urls' : process_urls, 'media': process_medias, }.get(ind_type, lambda: (Entity, entity_dict))() - + self.logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable if entity_klass: self.obj_buffer.add_object(entity_klass, None, entity_dict, False) def __process_twitter(self): - + tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() if tweet_nb > 0: return - - ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"]) - + + ts_copy = None + # Takes into account the extended tweets + if self.json_dict.get('truncated') and self.json_dict.get('extended_tweet'): + ts_copy = { **self.json_dict, **self.json_dict['extended_tweet'], "text": self.json_dict['extended_tweet']['full_text'] } + else: + ts_copy = { **self.json_dict } + + + ts_copy = adapt_fields(ts_copy, fields_adapter["stream"]["tweet"]) + # get or create user - user = self.__get_user(self.json_dict["user"], True) + user = self.__get_user(ts_copy["user"], True) if user is None: - self.logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable + self.logger.warning("USER not found " + repr(ts_copy["user"])) #@UndefinedVariable ts_copy["user_id"] = None else: ts_copy["user_id"] = user.id - + del(ts_copy['user']) ts_copy["tweet_source_id"] = self.source_id - + self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True) - - self.__process_entities() + + self.__process_entities(ts_copy) - def __process_entities(self): - if "entities" in self.json_dict: - for ind_type, entity_list in self.json_dict["entities"].items(): + def __process_entities(self, json_dict): + if "entities" in json_dict: + for ind_type, entity_list in json_dict["entities"].items(): for ind in entity_list: self.__process_entity(ind, ind_type) else: - + text = self.tweet.text extractor = twitter_text.Extractor(text) for ind in extractor.extract_hashtags_with_indices(): self.__process_entity(ind, "hashtags") - + for ind in extractor.extract_urls_with_indices(): self.__process_entity(ind, "urls") - + for ind in extractor.extract_mentioned_screen_names_with_indices(): self.__process_entity(ind, "user_mentions") def process_source(self): - + status_id = self.json_dict["id"] log = self.session.query(TweetLog).filter(TweetLog.status_id==status_id).first() if(log): @@ -307,24 +306,22 @@ } } """ - def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None): - TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger) def process(self): - + #find tweet tweet_id = self.json_dict.get('delete',{}).get('status',{}).get('id',None) if tweet_id: t = self.session.query(Tweet).options(joinedload(Tweet.tweet_source)).filter(Tweet.id == tweet_id).first() if t: - tsource = t.tweet_source + tsource = t.tweet_source self.session.delete(t) self.session.query(TweetLog).filter(TweetLog.tweet_source_id == tsource.id).delete() self.session.delete(tsource) self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['DELETE']}, True) else: self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status_id': tweet_id,'status':TweetLog.TWEET_STATUS['DELETE_PENDING']}, True) - + def log_info(self): status_del = self.json_dict.get('delete', {}).get("status",{}) return u"Process delete for %s : %s" % (status_del.get('user_id_str',u""), status_del.get('id_str',u"")) @@ -341,10 +338,7 @@ } """ - def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None): - TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger) - - def process_source(self): + def process_source(self): up_to_status_id = self.json_dict.get("scrub_geo", {}).get("up_to_status_id", None) if not up_to_status_id: return @@ -357,7 +351,7 @@ tsource_dict["geo"] = None self.obj_buffer.add_object(TweetSource, tsource, {'original_json': json.dumps(tsource_dict)}, False) self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['SCRUB_GEO']}, True) - + def log_info(self): return u"Process scrub geo for %s : %s" % (self.json_dict["scrub_geo"].get('user_id_str',u""), self.json_dict["scrub_geo"].get('id_str',u"")) @@ -369,19 +363,17 @@ "track":1234 } } - """ - def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None): - TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger) + """ def process_source(self): """ do nothing, just log the information - """ + """ self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['LIMIT'], 'error':self.json_txt}, True) - + def log_info(self): return u"Process limit %d " % self.json_dict.get("limit", {}).get('track', 0) - + class TwitterProcessorStatusWithheld(TwitterProcessor): """ { @@ -392,31 +384,27 @@ } } """ - def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None): - TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger) - + def process_source(self): """ do nothing, just log the information """ self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['STATUS_WITHHELD'], 'error':self.json_txt}, True) - + def log_info(self): status_withheld = self.json_dict.get("status_withheld",{}) return u"Process status withheld status id %d from user %d in countries %s" %(status_withheld.get("id",0), status_withheld.get("user_id",0), u",".join(status_withheld.get("withheld_in_countries",[]))) class TwitterProcessorUserWithheld(TwitterProcessor): """ - { + { "user_withheld":{ "id":123456, "withheld_in_countries":["DE","AR"] } } """ - def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None): - TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger) - + def process_source(self): """ do nothing, just log the information @@ -438,9 +426,7 @@ } } """ - def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None): - TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger) - + def process_source(self): """ do nothing, just log the information @@ -461,8 +447,6 @@ } } """ - def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None): - TwitterProcessor.__init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=access_token, token_filename=token_filename, user_query_twitter=user_query_twitter, logger=logger) def process_source(self): """ @@ -482,7 +466,7 @@ 'status_withheld': TwitterProcessorStatusWithheld, 'user_withheld': TwitterProcessorUserWithheld, 'disconnect': TwitterProcessorDisconnect, - 'warning': TwitterProcessorStallWarning + 'warning': TwitterProcessorStallWarning } def get_processor(tweet_dict): diff -r 184372ec27e2 -r 14a9bed2e3cd script/lib/iri_tweet/iri_tweet/stream.py --- a/script/lib/iri_tweet/iri_tweet/stream.py Wed Jan 02 17:49:19 2019 +0100 +++ b/script/lib/iri_tweet/iri_tweet/stream.py Thu Jan 10 18:36:36 2019 +0100 @@ -7,62 +7,15 @@ Module directly inspired by tweetstream ''' +import json +import select import time + import requests from requests.utils import stream_decode_response_unicode -import anyjson -import select - -from . import USER_AGENT, ConnectionError, AuthenticationError - -def iter_content_non_blocking(req, max_chunk_size=4096, decode_unicode=False, timeout=-1): - - if req._content_consumed: - raise RuntimeError( - 'The content for this response was already consumed' - ) - - req.raw._fp.fp._sock.setblocking(False) - - def generate(): - chunk_size = 1 - while True: - if timeout < 0: - rlist,_,_ = select.select([req.raw._fp.fp._sock], [], []) - else: - rlist,_,_ = select.select([req.raw._fp.fp._sock], [], [], timeout) - - if not rlist: - continue - - try: - chunk = req.raw.read(chunk_size, decode_content=True) - if not chunk: - break - if len(chunk) >= chunk_size and chunk_size < max_chunk_size: - chunk_size = min(chunk_size*2, max_chunk_size) - elif len(chunk) < chunk_size/2 and chunk_size < max_chunk_size: - chunk_size = max(chunk_size/2,1) - yield chunk - except requests.exceptions.SSLError as e: - if e.errno == 2: - # Apparently this means there was nothing in the socket buf - pass - else: - raise - - req._content_consumed = True +from . import USER_AGENT, AuthenticationError, ConnectionError - gen = generate() - - if decode_unicode: - gen = stream_decode_response_unicode(gen, req) - - return gen - - - class BaseStream(object): @@ -121,7 +74,7 @@ """ def __init__(self, auth, - raw=False, timeout=-1, url=None, compressed=False, chunk_size=4096, logger=None): + raw=False, timeout=-1, url=None, compressed=False, logger=None): self._conn = None self._rate_ts = None self._rate_cnt = 0 @@ -136,14 +89,13 @@ self.count = 0 self.rate = 0 self.user_agent = USER_AGENT - self.chunk_size = chunk_size if url: self.url = url - + self.muststop = False self._logger = logger - + self._iter = self.__iter__() - + def __enter__(self): return self @@ -154,24 +106,24 @@ def _init_conn(self): """Open the connection to the twitter server""" - + if self._logger : self._logger.debug("BaseStream Open the connection to the twitter server") - + headers = {'User-Agent': self.user_agent} - + if self._compressed: headers['Accept-Encoding'] = "deflate, gzip" postdata = self._get_post_data() or {} postdata['stall_warnings'] = 'true' - + if self._logger : self._logger.debug("BaseStream init connection url " + repr(self.url)) if self._logger : self._logger.debug("BaseStream init connection headers " + repr(headers)) if self._logger : self._logger.debug("BaseStream init connection data " + repr(postdata)) - + self._resp = requests.post(self.url, auth=self._auth, headers=headers, data=postdata, stream=True) if self._logger : self._logger.debug("BaseStream init connection " + repr(self._resp)) - + self._resp.raise_for_status() self.connected = True @@ -189,110 +141,59 @@ def testmuststop(self): if callable(self.muststop): - return self.muststop() + return self.muststop() # pylint: disable=not-callable else: return self.muststop - + def _update_rate(self): rate_time = time.time() - self._rate_ts if not self._rate_ts or rate_time > self.rate_period: self.rate = self._rate_cnt / rate_time self._rate_cnt = 0 self._rate_ts = time.time() - - def _iter_object(self): - -# for line in self._resp.iter_lines(): -# yield line -# pending = None -# -# for chunk in self._resp.iter_content(chunk_size=self.chunk_size, decode_unicode=None): -# -# if pending is not None: -# chunk = pending + chunk -# lines = chunk.splitlines() -# -# if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]: -# pending = lines.pop() -# else: -# pending = None -# -# for line in lines: -# yield line -# -# if pending is not None: -# yield pending - - pending = None - has_stopped = False - - if self._logger : self._logger.debug("BaseStream _iter_object") - - for chunk in self._resp.iter_content( - chunk_size=self.chunk_size, - decode_unicode=True): - - if self._logger : self._logger.debug("BaseStream _iter_object loop") - if self.testmuststop(): - has_stopped = True - break - - if pending is not None: - chunk = pending + chunk - lines = chunk.split('\r') - - if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]: - pending = lines.pop() - else: - pending = None - - for line in lines: - yield line.strip('\n') - - if self.testmuststop(): - has_stopped = True - break - - if pending is not None: - yield pending - if has_stopped: - raise StopIteration() - + def __iter__(self): - + if self._logger : self._logger.debug("BaseStream __iter__") if not self.connected: if self._logger : self._logger.debug("BaseStream __iter__ not connected, connecting") self._init_conn() if self._logger : self._logger.debug("BaseStream __iter__ connected") - - for line in self._iter_object(): + has_stopped = False - if self._logger : self._logger.debug("BaseStream __iter__ line %s " % repr(line)) - + # for line in self._iter_object(): + for line in self._resp.iter_lines(): + if not line: continue + if self.testmuststop(): + has_stopped = True + break + + + if self._logger : self._logger.debug("BaseStream __iter__ line %s " % repr(line)) + + text_in_tweet = False if (self.raw_mode): tweet = line + text_in_tweet = b'text' in tweet else: - line = line.decode("utf8") try: - tweet = anyjson.deserialize(line) + tweet = json.loads(line) except ValueError: self.close() raise ConnectionError("Got invalid data from twitter", details=line) - if 'text' in tweet: + text_in_tweet = 'text' in tweet + if text_in_tweet: self.count += 1 self._rate_cnt += 1 self._update_rate() yield tweet - - def next(self): - """Return the next available tweet. This call is blocking!""" - return self._iter.next() + if has_stopped: + raise StopIteration() def close(self): @@ -306,12 +207,12 @@ url = "https://stream.twitter.com/1.1/statuses/filter.json" def __init__(self, auth, follow=None, locations=None, - track=None, url=None, raw=False, timeout=None, compressed=False, chunk_size=requests.models.ITER_CHUNK_SIZE, logger=None): + track=None, url=None, raw=False, timeout=None, compressed=False, logger=None): self._follow = follow self._locations = locations self._track = track # remove follow, locations, track - BaseStream.__init__(self, auth, url=url, raw=raw, timeout=timeout, compressed=compressed, chunk_size=chunk_size, logger=logger) + BaseStream.__init__(self, auth, url=url, raw=raw, timeout=timeout, compressed=compressed, logger=logger) def _get_post_data(self): postdata = {} @@ -322,7 +223,7 @@ class SafeStreamWrapper(object): - + def __init__(self, base_stream, logger=None, error_cb=None, max_reconnects=-1, initial_tcp_wait=250, initial_http_wait=5000, max_wait=240000): self._stream = base_stream self._logger = logger @@ -342,12 +243,12 @@ self._error_cb(e) if self._logger: self._logger.info("stream sleeping for %d ms " % self._retry_wait) time.sleep(float(self._retry_wait)/1000.0) - - + + def __process_tcp_error(self,e): if self._logger: self._logger.debug("connection error type :" + repr(type(e))) if self._logger: self._logger.debug("connection error :" + repr(e)) - + self._reconnects += 1 if self._max_reconnects >= 0 and self._reconnects > self._max_reconnects: raise ConnectionError("Too many retries") @@ -355,10 +256,10 @@ self._retry_wait += self._initial_tcp_wait if self._retry_wait > self._max_wait: self._retry_wait = self._max_wait - + self.__post_process_error(e) - + def __process_http_error(self,e): if self._logger: self._logger.debug("http error type %s" % (repr(type(e)))) if self._logger: self._logger.debug("http error on %s : %s" % (e.response.url,e.message)) @@ -367,9 +268,9 @@ self._retry_wait = 2*self._retry_wait if self._retry_wait > 0 else self._initial_http_wait if self._retry_wait > self._max_wait: self._retry_wait = self._max_wait - + self.__post_process_error(e) - + def __iter__(self): while not self._stream.testmuststop(): self._retry_nb += 1 @@ -385,15 +286,11 @@ yield tweet except requests.exceptions.HTTPError as e: if e.response.status_code == 401: - if self._logger: self._logger.debug("SafeStreamWrapper Connection Error http error on %s : %s" % (e.response.url,e.message)) - raise AuthenticationError("Error connecting to %s : %s : %s - %s" % (e.response.url,e.message, repr(e.response.headers),repr(e.response.text))) + if self._logger: self._logger.debug("SafeStreamWrapper Connection Error http error on %s : %s" % (e.response.url,e.strerror)) + raise AuthenticationError("Error connecting to %s : %s : %s - %s" % (e.response.url,e.strerror, repr(e.response.headers),repr(e.response.text))) if e.response.status_code > 200: self.__process_http_error(e) else: self.__process_tcp_error(e) except (ConnectionError, requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.RequestException) as e: self.__process_tcp_error(e) - - - - \ No newline at end of file diff -r 184372ec27e2 -r 14a9bed2e3cd script/lib/iri_tweet/iri_tweet/utils.py --- a/script/lib/iri_tweet/iri_tweet/utils.py Wed Jan 02 17:49:19 2019 +0100 +++ b/script/lib/iri_tweet/iri_tweet/utils.py Thu Jan 10 18:36:36 2019 +0100 @@ -57,6 +57,46 @@ get_logger().debug("get_oauth_token : done got %s" % repr(res)) return res + +def get_oauth2_token(consumer_key, consumer_secret, token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME): + + global CACHE_ACCESS_TOKEN + + if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET: + return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET + + res = CACHE_ACCESS_TOKEN.get(application_name, None) + + if res is None and token_file_path and os.path.exists(token_file_path): + get_logger().debug("get_oauth2_token : reading token from file %s" % token_file_path) #@UndefinedVariable + res = twitter.oauth2.read_bearer_token_file(token_file_path) + + if res is not None and check_access_token: + get_logger().debug("get_oauth2_token : Check oauth tokens") #@UndefinedVariable + t = twitter.Twitter(auth=twitter.OAuth2(consumer_key, consumer_secret, res)) + status = None + try: + status = t.application.rate_limit_status() + except Exception as e: + get_logger().debug("get_oauth2_token : error getting rate limit status %s " % repr(e)) + get_logger().debug("get_oauth2_token : error getting rate limit status %s " % str(e)) + status = None + get_logger().debug("get_oauth2_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable + if status is None or status.get("resources",{}).get("account",{}).get('/account/verify_credentials',{}).get('remaining',0) == 0: + get_logger().debug("get_oauth2_token : Problem with status %s" % repr(status)) + res = None + + if res is None: + get_logger().debug("get_oauth2_token : doing the oauth dance") + res = twitter.oauth2_dance(consumer_key, consumer_secret, token_file_path) + + + CACHE_ACCESS_TOKEN[application_name] = res + + get_logger().debug("get_oauth_token : done got %s" % repr(res)) + return res + + def parse_date(date_str): ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable return datetime.datetime(*ts[0:7]) @@ -116,12 +156,10 @@ def persists(self, session): new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else [] - new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.iteritems()]) if self.kwargs is not None else {} + new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {} - if self.instance is None: - self.instance = self.klass(*new_args, **new_kwargs) - else: - self.instance = self.klass(*new_args, **new_kwargs) + self.instance = self.klass(*new_args, **new_kwargs) + if self.instance is not None: self.instance = session.merge(self.instance) session.add(self.instance) diff -r 184372ec27e2 -r 14a9bed2e3cd script/stream/recorder_stream.py --- a/script/stream/recorder_stream.py Wed Jan 02 17:49:19 2019 +0100 +++ b/script/stream/recorder_stream.py Thu Jan 10 18:36:36 2019 +0100 @@ -1,33 +1,35 @@ -from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer -from iri_tweet import models, utils -from iri_tweet.models import TweetSource, TweetLog, ProcessEvent -from iri_tweet.processor import get_processor -from multiprocessing import Queue as mQueue, Process, Event -from sqlalchemy.exc import OperationalError -from sqlalchemy.orm import scoped_session -import Queue -import StringIO -import anyjson import argparse import datetime import inspect -import iri_tweet.stream +import json import logging import os +import queue import re -import requests_oauthlib import shutil import signal import socket -import sqlalchemy.schema import sys -import thread import threading import time import traceback -import urllib2 -socket._fileobject.default_bufsize = 0 +import urllib +from http.server import BaseHTTPRequestHandler, HTTPServer +from io import StringIO +from multiprocessing import Event, Process +from multiprocessing import Queue as mQueue +import requests_oauthlib +import sqlalchemy.schema +import twitter +from sqlalchemy.exc import OperationalError +from sqlalchemy.orm import scoped_session + +import _thread +import iri_tweet.stream +from iri_tweet import models, utils +from iri_tweet.models import ProcessEvent, TweetLog, TweetSource +from iri_tweet.processor import get_processor # columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user'] @@ -40,20 +42,17 @@ class Requesthandler(BaseHTTPRequestHandler): - def __init__(self, request, client_address, server): - BaseHTTPRequestHandler.__init__(self, request, client_address, server) - def do_GET(self): self.send_response(200) self.end_headers() - + def log_message(self, format, *args): # @ReservedAssignment pass def set_logging(options): loggers = [] - + loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet'))) loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing'))) if options.debug >= 2: @@ -68,17 +67,14 @@ qlogger.propagate = 0 return qlogger -def get_auth(options, access_token): - consumer_key = options.consumer_key - consumer_secret = options.consumer_secret - auth = requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=access_token[0], resource_owner_secret=access_token[1], signature_type='auth_header') - return auth +def get_auth(consumer_key, consumer_secret, token_key, token_secret): + return requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=token_key, resource_owner_secret=token_secret, signature_type='auth_header') def add_process_event(event_type, args, session_maker): session = session_maker() try: - evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=event_type) + evt = ProcessEvent(args=None if args is None else json.dumps(args), type=event_type) session.add(evt) session.commit() finally: @@ -87,15 +83,14 @@ class BaseProcess(Process): - def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): + def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid): self.parent_pid = parent_pid self.session_maker = session_maker self.queue = queue self.options = options self.logger_queue = logger_queue self.stop_event = stop_event - self.consumer_token = (options.consumer_key, options.consumer_secret) - self.access_token = access_token + self.twitter_auth = twitter_auth super(BaseProcess, self).__init__() @@ -112,10 +107,10 @@ else: # *ring* Hi mom! return True - + def __get_process_event_args(self): - return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token} + return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__} def run(self): try: @@ -123,47 +118,45 @@ self.do_run() finally: add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker) - + def do_run(self): raise NotImplementedError() class SourceProcess(BaseProcess): - - def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): + + def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid): self.track = options.track - self.token_filename = options.token_filename self.timeout = options.timeout self.stream = None - super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) - + super(SourceProcess, self).__init__(session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid) + def __source_stream_iter(self): - + self.logger.debug("SourceProcess : run ") - - self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.access_token)) - self.auth = get_auth(self.options, self.access_token) + + self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.twitter_auth)) + self.auth = get_auth(self.twitter_auth.consumer_key, self.twitter_auth.consumer_secret, self.twitter_auth.token, self.twitter_auth.token_secret) self.logger.debug("SourceProcess : auth set ") - track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() self.logger.debug("SourceProcess : track list " + track_list) - + track_list = [k.strip() for k in track_list.split(',')] - self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth))) - self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, chunk_size=512, logger=self.logger) + self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth))) + self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, logger=self.logger) self.logger.debug("SourceProcess : after connecting to stream") - self.stream.muststop = lambda: self.stop_event.is_set() - + self.stream.muststop = lambda: self.stop_event.is_set() + stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger) - + session = self.session_maker() - + #import pydevd #pydevd.settrace(suspend=False) - + try: for tweet in stream_wrapper: if not self.parent_is_alive(): @@ -184,7 +177,7 @@ self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries)) if add_retries == 10: raise - + source_id = source.id self.logger.debug("SourceProcess : before queue + source id " + repr(source_id)) self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime))) @@ -204,13 +197,13 @@ def do_run(self): - - self.logger = set_logging_process(self.options, self.logger_queue) - + + self.logger = set_logging_process(self.options, self.logger_queue) + source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread") - + source_stream_iter_thread.start() - + try: while not self.stop_event.is_set(): self.logger.debug("SourceProcess : In while after start") @@ -230,11 +223,11 @@ source_stream_iter_thread.join(30) -def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger): +def process_tweet(tweet, source_id, session, twitter_auth, twitter_query_user, logger): try: if not tweet.strip(): return - tweet_obj = anyjson.deserialize(tweet) + tweet_obj = json.loads(tweet) processor_klass = get_processor(tweet_obj) if not processor_klass: tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) @@ -244,18 +237,16 @@ json_txt=tweet, source_id=source_id, session=session, - consumer_token=consumer_token, - access_token=access_token, - token_filename=token_filename, + twitter_auth=twitter_auth, user_query_twitter=twitter_query_user, logger=logger) - logger.info(processor.log_info()) - logger.debug(u"Process_tweet :" + repr(tweet)) + logger.info(processor.log_info()) + logger.debug(u"Process_tweet :" + repr(tweet)) processor.process() - + except ValueError as e: message = u"Value Error %s processing tweet %s" % (repr(e), tweet) - output = StringIO.StringIO() + output = StringIO() try: traceback.print_exc(file=output) error_stack = output.getvalue() @@ -263,11 +254,11 @@ output.close() tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack) session.add(tweet_log) - session.commit() + session.commit() except Exception as e: message = u"Error %s processing tweet %s" % (repr(e), tweet) logger.exception(message) - output = StringIO.StringIO() + output = StringIO() try: traceback.print_exc(file=output) error_stack = output.getvalue() @@ -278,17 +269,17 @@ session.add(tweet_log) session.commit() - - + + class TweetProcess(BaseProcess): - - def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): - super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) + + def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid): + super(TweetProcess, self).__init__(session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid) self.twitter_query_user = options.twitter_query_user def do_run(self): - + self.logger = set_logging_process(self.options, self.logger_queue) session = self.session_maker() try: @@ -299,7 +290,7 @@ except Exception as e: self.logger.debug('Process tweet exception in loop : ' + repr(e)) continue - process_tweet(tweet_txt, source_id, session, self.consumer_token, self.access_token, self.twitter_query_user, self.options.token_filename, self.logger) + process_tweet(tweet_txt, source_id, session, self.twitter_auth, self.twitter_query_user, self.logger) session.commit() except KeyboardInterrupt: self.stop_event.set() @@ -313,36 +304,36 @@ Session = scoped_session(Session) return Session, engine, metadata - -def process_leftovers(session, consumer_token, access_token, twitter_query_user, token_filename, ask_process_leftovers, logger): - + +def process_leftovers(session, twitter_auth, twitter_query_user, ask_process_leftovers, logger): + sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) sources_count = sources.count() - + if sources_count > 10 and ask_process_leftovers: - resp = raw_input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count) + resp = input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count) if resp and resp.strip().lower() == "n": return logger.info("Process leftovers, %d tweets to process" % (sources_count)) for src in sources: tweet_txt = src.original_json - process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger) + process_tweet(tweet_txt, src.id, session, twitter_auth, twitter_query_user, logger) session.commit() - - + + def process_log(logger_queues, stop_event): while not stop_event.is_set(): for lqueue in logger_queues: try: record = lqueue.get_nowait() logging.getLogger(record.name).handle(record) - except Queue.Empty: + except queue.Empty: continue except IOError: continue time.sleep(0.1) - + def get_options(): usage = "usage: %(prog)s [options]" @@ -385,59 +376,59 @@ def do_run(options, session_maker): stop_args = {} - - consumer_token = (options.consumer_key, options.consumer_secret) - access_token = utils.get_oauth_token(consumer_key=consumer_token[0], consumer_secret=consumer_token[1], token_file_path=options.token_filename) - - + + + access_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename) + twitter_auth = twitter.OAuth(access_token_key, access_token_secret, options.consumer_key, options.consumer_secret) + session = session_maker() try: - process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger()) + process_leftovers(session, twitter_auth, options.twitter_query_user, options.ask_process_leftovers, utils.get_logger()) session.commit() finally: session.rollback() session.close() - + if options.process_nb <= 0: utils.get_logger().debug("Leftovers processed. Exiting.") return None queue = mQueue() stop_event = Event() - + # workaround for bug on using urllib2 and multiprocessing httpd = HTTPServer(('127.0.0.1',0), Requesthandler) - thread.start_new_thread(httpd.handle_request, ()) - - req = urllib2.Request('http://localhost:%d' % httpd.server_port) + _thread.start_new_thread(httpd.handle_request, ()) + + req = urllib.request.Request('http://localhost:%d' % httpd.server_port) conn = None try: - conn = urllib2.urlopen(req) + conn = urllib.request.urlopen(req) except: utils.get_logger().debug("could not open localhost") # donothing finally: if conn is not None: conn.close() - + process_engines = [] logger_queues = [] - + SessionProcess, engine_process, _ = get_sessionmaker(conn_str) process_engines.append(engine_process) lqueue = mQueue(50) logger_queues.append(lqueue) pid = os.getpid() - sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) - + sprocess = SourceProcess(SessionProcess, queue, options, twitter_auth, stop_event, lqueue, pid) + tweet_processes = [] - + for i in range(options.process_nb - 1): SessionProcess, engine_process, _ = get_sessionmaker(conn_str) process_engines.append(engine_process) lqueue = mQueue(50) logger_queues.append(lqueue) - cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) + cprocess = TweetProcess(SessionProcess, queue, options, twitter_auth, stop_event, lqueue, pid) tweet_processes.append(cprocess) log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) @@ -452,18 +443,18 @@ add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker) if options.duration >= 0: - end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) + end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) def interupt_handler(signum, frame): utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}) stop_event.set() - + signal.signal(signal.SIGINT , interupt_handler) signal.signal(signal.SIGHUP , interupt_handler) signal.signal(signal.SIGALRM, interupt_handler) signal.signal(signal.SIGTERM, interupt_handler) - + while not stop_event.is_set(): if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: @@ -484,7 +475,7 @@ utils.get_logger().debug("Pb joining Source Process - terminating") finally: sprocess.terminate() - + for i, cprocess in enumerate(tweet_processes): utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) try: @@ -493,7 +484,7 @@ utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) cprocess.terminate() - + utils.get_logger().debug("Close queues") try: queue.close() @@ -502,13 +493,13 @@ except Exception as e: utils.get_logger().error("error when closing queues %s", repr(e)) # do nothing - - + + if options.process_nb > 1: utils.get_logger().debug("Processing leftovers") session = session_maker() try: - process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger()) + process_leftovers(session, twitter_auth, options.twitter_query_user, options.ask_process_leftovers, utils.get_logger()) session.commit() finally: session.rollback() @@ -516,18 +507,18 @@ for pengine in process_engines: pengine.dispose() - + return stop_args def main(options): - + global conn_str - + conn_str = options.conn_str.strip() - if not re.match("^\w+://.+", conn_str): + if not re.match(r"^\w+://.+", conn_str): conn_str = 'sqlite:///' + options.conn_str - + if conn_str.startswith("sqlite") and options.new: filepath = conn_str[conn_str.find(":///") + 4:] if os.path.exists(filepath): @@ -543,7 +534,7 @@ shutil.move(filepath, new_path) Session, engine, metadata = get_sessionmaker(conn_str) - + if options.new: check_metadata = sqlalchemy.schema.MetaData(bind=engine) check_metadata.reflect() @@ -551,28 +542,28 @@ message = "Database %s not empty exiting" % conn_str utils.get_logger().error(message) sys.exit(message) - + metadata.create_all(engine) session = Session() try: models.add_model_version(session) finally: session.close() - + stop_args = {} try: add_process_event(event_type="start", args={'options':options.__dict__, 'args': [], 'command_line': sys.argv}, session_maker=Session) stop_args = do_run(options, Session) except Exception as e: - utils.get_logger().exception("Error in main thread") - outfile = StringIO.StringIO() + utils.get_logger().exception("Error in main thread") + outfile = StringIO() try: traceback.print_exc(file=outfile) stop_args = {'error': repr(e), 'message': repr(e), 'stacktrace':outfile.getvalue()} finally: outfile.close() raise - finally: + finally: add_process_event(event_type="shutdown", args=stop_args, session_maker=Session) utils.get_logger().debug("Done. Exiting. " + repr(stop_args)) @@ -582,22 +573,21 @@ if __name__ == '__main__': options = get_options() - + loggers = set_logging(options) - + utils.get_logger().debug("OPTIONS : " + repr(options)) - + if options.daemon: options.ask_process_leftovers = False import daemon - + hdlr_preserve = [] for logger in loggers: hdlr_preserve.extend([h.stream for h in logger.handlers]) - - context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) + + context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) with context: main(options) else: main(options) - diff -r 184372ec27e2 -r 14a9bed2e3cd script/utils/merge_tweets.py --- a/script/utils/merge_tweets.py Wed Jan 02 17:49:19 2019 +0100 +++ b/script/utils/merge_tweets.py Thu Jan 10 18:36:36 2019 +0100 @@ -5,6 +5,7 @@ import logging import re import sys +import twitter from iri_tweet.models import Tweet, TweetLog, TweetSource, setup_database from iri_tweet.processor import TwitterProcessorStatus @@ -13,7 +14,7 @@ logger = logging.getLogger(__name__) def get_option(): - + parser = argparse.ArgumentParser(description='Merge tweets databases') parser.add_argument("-l", "--log", dest="logfile", @@ -31,23 +32,24 @@ parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", help="Token file name") - + parser.add_argument("source", action="store", nargs=1, type=str, metavar="SOURCE") parser.add_argument("target", action="store", nargs=1, type=str, metavar="TARGET") - + return parser.parse_args() if __name__ == "__main__": - + #sys.stdout = codecs.getwriter(sys.stdout.encoding)(sys.stdout) writer = None options = get_option() - - access_token = None + + twitter_auth = None if options.query_user: - access_token = get_oauth_token(options.consumer_key, options.consumer_secret, options.token_filename) - + acess_token_key, access_token_secret = get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename) + twitter_auth = twitter.OAuth(acess_token_key, access_token_secret, options.consumer_key, options.consumer_secret) + #open source src_conn_str = options.source[0].strip() if not re.match(r"^\w+://.+", src_conn_str): @@ -58,51 +60,51 @@ engine_src, metadata_src, Session_src = setup_database(src_conn_str, echo=((options.verbose-options.quiet)>0), create_all = False) - engine_tgt, metadata_tgt, Session_tgt = setup_database(tgt_conn_str, echo=((options.verbose-options.quiet)>0), create_all = False) + engine_tgt, metadata_tgt, Session_tgt = setup_database(tgt_conn_str, echo=((options.verbose-options.quiet)>0), create_all = False) conn_src = conn_tgt = session_src = session_tgt = None - + try: #conn_src = engine_src.connect() #conn_tgt = engine_tgt.connect() session_src = Session_src() session_tgt = Session_tgt() - + count_tw = session_src.query(Tweet).count() - + if count_tw == 0: print("No tweet to process : exit") sys.exit() - + query_src = session_src.query(Tweet).join(TweetSource).yield_per(100) added = 0 - + for i,tweet in enumerate(query_src): - + tweet_count = session_tgt.query(Tweet).filter(Tweet.id == tweet.id).count() - + progress_text = u"Process: " if tweet_count == 0: added += 1 progress_text = u"Adding : " tweet_source = tweet.tweet_source.original_json - + tweet_obj = json.loads(tweet_source) if 'text' not in tweet_obj: tweet_log = TweetLog(tweet_source_id=tweet.tweet_source.id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) session_tgt.add(tweet_log) - else: - tp = TwitterProcessorStatus(None, tweet_source, None, session_tgt, access_token, options.token_filename, user_query_twitter=options.query_user, logger=logger) + else: + tp = TwitterProcessorStatus(None, tweet_source, None, session_tgt, twitter_auth=twitter_auth, user_query_twitter=options.query_user, logger=logger) tp.process() - + session_tgt.flush() - + ptext = progress_text + tweet.text writer = show_progress(i+1, count_tw, ptext.replace("\n",""), 70, writer) - + session_tgt.commit() print(u"%d new tweet added" % (added,)) - + finally: if session_tgt is not None: session_tgt.close() diff -r 184372ec27e2 -r 14a9bed2e3cd script/utils/search_topsy.py --- a/script/utils/search_topsy.py Wed Jan 02 17:49:19 2019 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,181 +0,0 @@ -import argparse -import logging -import math -import re -import time - -from blessings import Terminal -import requests -import twitter - -from iri_tweet import models, utils -from iri_tweet.processor import TwitterProcessorStatus - - -logger = logging.getLogger(__name__) - -APPLICATION_NAME = "Tweet recorder user" - - -class TopsyResource(object): - - def __init__(self, query, **kwargs): - - self.options = kwargs - self.options['q'] = query - self.url = kwargs.get("url", "http://otter.topsy.com/search.json") - self.page = 0 - self.req = None - self.res = {} - - def __initialize(self): - - params = {} - params.update(self.options) - self.req = requests.get(self.url, params=params) - self.res = self.req.json() - - def __next_page(self): - page = self.res.get("response").get("page") + 1 - params = {} - params.update(self.options) - params['page'] = page - self.req = requests.get(self.url, params=params) - self.res = self.req.json() - - def __iter__(self): - if not self.req: - self.__initialize() - while "response" in self.res and "list" in self.res.get("response") and self.res.get("response").get("list"): - for item in self.res.get("response").get("list"): - yield item - self.__next_page() - - def total(self): - if not self.res: - return 0 - else: - return self.res.get("response",{}).get("total",0) - - - -def get_options(): - - usage = "usage: %(prog)s [options] " - - parser = argparse.ArgumentParser(usage=usage) - - parser.add_argument(dest="conn_str", - help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR") - parser.add_argument("-Q", dest="query", - help="query", metavar="QUERY") - parser.add_argument("-k", "--key", dest="consumer_key", - help="Twitter consumer key", metavar="CONSUMER_KEY") - parser.add_argument("-s", "--secret", dest="consumer_secret", - help="Twitter consumer secret", metavar="CONSUMER_SECRET") - parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", - help="Token file name") - parser.add_argument("-T", dest="topsy_apikey", metavar="TOPSY_APIKEY", default=None, - help="Topsy apikey") - - utils.set_logging_options(parser) - - return parser.parse_args() - - - -if __name__ == "__main__": - - options = get_options() - - utils.set_logging(options); - - - acess_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename, application_name=APPLICATION_NAME) - - t = twitter.Twitter(domain="api.twitter.com", auth=twitter.OAuth(acess_token_key, access_token_secret, options.consumer_key, options.consumer_secret), secure=True) - t.secure = True - - conn_str = options.conn_str.strip() - if not re.match("^\w+://.+", conn_str): - conn_str = 'sqlite:///' + conn_str - - engine, metadata, Session = models.setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all=True) - session = None - - - topsy_parameters = { - 'apikey': options.topsy_apikey, - 'perpage': 100, - 'window': 'a', - 'type': 'tweet', - 'hidden': True, - } - - term = Terminal() - - try: - session = Session() - - results = None - page = 1 - print options.query - - tr = TopsyResource(options.query, **topsy_parameters) - - move_up = 0 - - for i,item in enumerate(tr): - # get id - url = item.get("url") - tweet_id = url.split("/")[-1] - - if move_up > 0: - print((move_up+1)*term.move_up()) - move_up = 0 - - print ("%d/%d:%03d%% - %s - %r" % (i+1, tr.total(), int(float(i+1)/float(tr.total())*100.0), tweet_id, item.get("content") ) + term.clear_eol()) - move_up += 1 - - count_tweet = session.query(models.Tweet).filter_by(id_str=tweet_id).count() - - if count_tweet: - continue - try: - tweet = t.statuses.show(id=tweet_id, include_entities=True) - except twitter.api.TwitterHTTPError as e: - if e.e.code == 404 or e.e.code == 403: - continue - else: - raise - - processor = TwitterProcessorStatus(tweet, None, None, session, None, options.token_filename, logger) - processor.process() - session.flush() - session.commit() - - print("rate limit remaining %s of %s" % (str(tweet.rate_limit_remaining), str(tweet.headers.getheader('X-Rate-Limit-Limit'))) + term.clear_eol()) - move_up += 1 - rate_limit_limit = int(tweet.headers.getheader('X-Rate-Limit-Limit')) - rate_limit_remaining = int(tweet.rate_limit_remaining) - - if rate_limit_remaining < rate_limit_limit: - time_to_sleep = 0 - else: - time_to_sleep = int(math.ceil((tweet.rate_limit_reset - time.mktime(time.gmtime())) / tweet.rate_limit_remaining)) - - for i in xrange(time_to_sleep): - if i: - print(2*term.move_up()) - else: - move_up += 1 - print(("Sleeping for %d seconds, %d remaining" % (time_to_sleep, time_to_sleep-i)) + term.clear_eol()) - time.sleep(1) - - except twitter.api.TwitterHTTPError as e: - fmt = ("." + e.format) if e.format else "" - print "Twitter sent status %s for URL: %s%s using parameters: (%s)\ndetails: %s" % (repr(e.e.code), repr(e.uri), repr(fmt), repr(e.uriparts), repr(e.response_data)) - - finally: - if session: - session.close() diff -r 184372ec27e2 -r 14a9bed2e3cd script/utils/search_topsy_scrap.py --- a/script/utils/search_topsy_scrap.py Wed Jan 02 17:49:19 2019 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,211 +0,0 @@ -import argparse -import logging -import math -import re -import time -import urllib - -from blessings import Terminal -import requests -import twitter - -from iri_tweet import models, utils -from iri_tweet.processor import TwitterProcessorStatus - -from selenium import webdriver -from selenium.webdriver.common.desired_capabilities import DesiredCapabilities -from selenium.webdriver.common.by import By -from selenium.webdriver.support.ui import WebDriverWait -from selenium.webdriver.support import expected_conditions as EC - -from lxml import html -import json - -logger = logging.getLogger(__name__) - -APPLICATION_NAME = "Tweet recorder user" - -dcap = dict(DesiredCapabilities.PHANTOMJS) -dcap["phantomjs.page.settings.userAgent"] = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.103 Safari/537.36" - -class TopsyResource(object): - - def __init__(self, query, **kwargs): - - self.options = {} - self.options['q'] = query - self.options.update(kwargs) - self.base_url = "http://topsy.com/s" - self.driver = webdriver.PhantomJS(desired_capabilities=dcap) - self.driver.set_window_size(1024, 768) - self.page = -1 - self.tree = None - - - def __do_request(self, params): - url = "%s?%s" % (self.base_url, urllib.urlencode(params).replace('+','%20')) #calculate url with urllib - print('Requesting %s' % url) - self.driver.get(url) - try: - element = WebDriverWait(self.driver, 60).until( - EC.presence_of_element_located((By.CLASS_NAME, "result-tweet")) - ) - except Exception as e: - print('Exception requesting %s : %s' % (url, e)) - self.tree = None - else: - self.tree = html.fromstring(self.driver.page_source) - - def __check_last(self): - if self.page < 0: - return False - if self.tree is None or len(self.tree.xpath("//*[@id=\"module-pager\"]/div/ul/li[@data-page=\"next\"and @class=\"disabled\"]")): - return True - else: - return False - - - def __next_page(self): - if self.__check_last(): - return False - self.page += 1 - params = {} - params.update(self.options) - if self.page: - params['offset'] = self.page*self.options.get('perpage',10) - self.__do_request(params) - return self.tree is not None - - def __iter__(self): - result_xpath = "//*[@id=\"results\"]/div" - while self.__next_page(): - for res_node in self.tree.xpath(result_xpath): - res_obj = { - 'user': "".join(res_node.xpath("./div/div/h5/a/text()")), - 'content': "".join(res_node.xpath("./div/div/div/text()")), - 'url': "".join(res_node.xpath("./div/div/ul/li[1]/small/a/@href")) - } - if res_obj['url']: - yield res_obj - - -def get_options(): - - usage = "usage: %(prog)s [options] " - - parser = argparse.ArgumentParser(usage=usage) - - parser.add_argument(dest="conn_str", - help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR") - parser.add_argument("-Q", dest="query", - help="query", metavar="QUERY") - parser.add_argument("-k", "--key", dest="consumer_key", - help="Twitter consumer key", metavar="CONSUMER_KEY") - parser.add_argument("-s", "--secret", dest="consumer_secret", - help="Twitter consumer secret", metavar="CONSUMER_SECRET") - parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", - help="Token file name") - parser.add_argument("-T", dest="topsy_apikey", metavar="TOPSY_APIKEY", default=None, - help="Topsy apikey") - - utils.set_logging_options(parser) - - return parser.parse_args() - - - -if __name__ == "__main__": - - options = get_options() - - utils.set_logging(options); - - - acess_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename, application_name=APPLICATION_NAME) - - t = twitter.Twitter(domain="api.twitter.com", auth=twitter.OAuth(acess_token_key, access_token_secret, options.consumer_key, options.consumer_secret), secure=True) - t.secure = True - - conn_str = options.conn_str.strip() - if not re.match("^\w+://.+", conn_str): - conn_str = 'sqlite:///' + conn_str - - engine, metadata, Session = models.setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all=True) - session = None - - - topsy_parameters = { - 'perpage': 10, - 'window': 'a', - 'type': 'tweet', - 'hidden': 1, - 'sort': 'date' - } - - term = Terminal() - - try: - session = Session() - - results = None - page = 1 - print options.query - - tr = TopsyResource(options.query, **topsy_parameters) - - move_up = 0 - - for i,item in enumerate(tr): - # get id - url = item.get("url") - tweet_id = url.split("/")[-1] - - if move_up > 0: - print((move_up+1)*term.move_up()) - move_up = 0 - - print ("%d: %s - %r" % (i+1, tweet_id, item.get("content") ) + term.clear_eol()) - move_up += 1 - - count_tweet = session.query(models.Tweet).filter_by(id_str=tweet_id).count() - - if count_tweet: - continue - try: - tweet = t.statuses.show(id=tweet_id, include_entities=True) - except twitter.api.TwitterHTTPError as e: - if e.e.code == 404 or e.e.code == 403: - continue - else: - raise - - processor = TwitterProcessorStatus(tweet, None, None, session, None, options.token_filename, logger) - processor.process() - session.flush() - session.commit() - - print("rate limit remaining %s of %s" % (str(tweet.rate_limit_remaining), str(tweet.headers.getheader('X-Rate-Limit-Limit'))) + term.clear_eol()) - move_up += 1 - rate_limit_limit = int(tweet.headers.getheader('X-Rate-Limit-Limit')) - rate_limit_remaining = int(tweet.rate_limit_remaining) - - if rate_limit_remaining < rate_limit_limit: - time_to_sleep = 0 - else: - time_to_sleep = int(math.ceil((tweet.rate_limit_reset - time.mktime(time.gmtime())) / tweet.rate_limit_remaining)) - - for i in xrange(time_to_sleep): - if i: - print(2*term.move_up()) - else: - move_up += 1 - print(("Sleeping for %d seconds, %d remaining" % (time_to_sleep, time_to_sleep-i)) + term.clear_eol()) - time.sleep(1) - - except twitter.api.TwitterHTTPError as e: - fmt = ("." + e.format) if e.format else "" - print "Twitter sent status %s for URL: %s%s using parameters: (%s)\ndetails: %s" % (repr(e.e.code), repr(e.uri), repr(fmt), repr(e.uriparts), repr(e.response_data)) - - finally: - if session: - session.close() diff -r 184372ec27e2 -r 14a9bed2e3cd script/utils/search_twitter_api.py --- a/script/utils/search_twitter_api.py Wed Jan 02 17:49:19 2019 +0100 +++ b/script/utils/search_twitter_api.py Thu Jan 10 18:36:36 2019 +0100 @@ -1,47 +1,91 @@ import argparse +import datetime +import functools +import json import logging import math import re import time -import datetime import urllib +from enum import Enum -from blessings import Terminal import requests import twitter +from blessings import Terminal from iri_tweet import models, utils from iri_tweet.processor import TwitterProcessorStatus -import json - logger = logging.getLogger(__name__) APPLICATION_NAME = "Tweet seach json" +class SearchType(Enum): + standard = 'standard' + _30day = '30day' + full = 'full' + + def __str__(self): + return self.value + +def pass_kwargs_as_json(f): + def kwargs_json_wrapper(*args, **kwargs): + normal_kwargs = { k:v for k,v in kwargs.items() if k[0] != "_" } + special_kwargs = { k:v for k,v in kwargs.items() if k[0] == "_" } + new_kwargs = { **special_kwargs, '_json': normal_kwargs } + return f(*args, **new_kwargs) + return kwargs_json_wrapper + # TODO: implement some more parameters # script to "scrap twitter results" # Shamelessly taken from https://github.com/Jefferson-Henrique/GetOldTweets-python # pyquery cssselect class TweetManager: - def __init__(self, query, twitter_con): + def __init__(self, twitter_con, query, search_type, api_env): self.query = query - self.max_id = 0 + self.search_type = search_type + self.next = "" self.t = twitter_con - pass + self.api_env = api_env + self.twitter_api = self.get_twitter_api() + self.rate_limit_remaining = 0 + self.rate_limit_limit = 0 + self.rate_limit_reset = 0 + self.i = 0 + + def get_twitter_api(self): + return { + SearchType.standard: lambda t: t.search.tweets, + SearchType._30day: lambda t: pass_kwargs_as_json(functools.partial(getattr(getattr(t.tweets.search,'30day'),self.api_env), _method="POST")), + SearchType.full: lambda t: pass_kwargs_as_json(functools.partial(getattr(t.tweets.search.fullarchive, self.api_env), _method="POST")), + }[self.search_type](self.t) def __iter__(self): while True: - if self.max_id < 0: + if self.next is None: break - json = self.get_json_response() + self.i = self.i+1 + + # with open("json_dump_%s.json" % self.i, 'r') as fp: + # jsondata = json.load(fp) + jsondata = self.get_json_response() - next_results = json['search_metadata'].get('next_results', "?")[1:] - self.max_id = int(urllib.parse.parse_qs(next_results).get('max_id', [-1])[0]) + self.rate_limit_remaining = jsondata.rate_limit_remaining + self.rate_limit_limit = jsondata.rate_limit_limit + self.rate_limit_reset = jsondata.rate_limit_reset + + with open("json_dump_%s.json" % self.i, 'w') as fp: + json.dump(jsondata, fp) - tweet_list = json['statuses'] + if self.search_type == SearchType.standard: + next_results = jsondata['search_metadata'].get('next_results', "?")[1:] + self.next = urllib.parse.parse_qs(next_results).get('max_id', [None])[0] + tweet_list = jsondata['statuses'] + else: + self.next = jsondata.get('next') + tweet_list = jsondata['results'] if len(tweet_list) == 0: break @@ -50,8 +94,13 @@ yield tweet def get_json_response(self): - return self.t.search.tweets(q=self.query, include_entities=True, max_id=self.max_id) - + if self.search_type == SearchType.standard: + return self.twitter_api(q=self.query, include_entities=True, max_id=int(self.next) if self.next else 0) + else: + kwargs = { "query": self.query, "maxResults": 100 } + if self.next: + kwargs["next"] = self.next + return self.twitter_api(**kwargs) def get_options(): @@ -62,31 +111,37 @@ parser.add_argument(dest="conn_str", help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR") parser.add_argument("-Q", dest="query", - help="query", metavar="QUERY") + help="query", metavar="QUERY") parser.add_argument("-k", "--key", dest="consumer_key", help="Twitter consumer key", metavar="CONSUMER_KEY") parser.add_argument("-s", "--secret", dest="consumer_secret", help="Twitter consumer secret", metavar="CONSUMER_SECRET") parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", - help="Token file name") + help="Token file name") + parser.add_argument("-a", dest="search_type", metavar="SEARCH_TYPE", default=SearchType.standard, choices=list(SearchType), type=SearchType, + help="Twitter search type ('standard', '30days', 'full')") + parser.add_argument("-e", dest="api_env", metavar="API_ENV", default="dev", + help="Twitter api dev environment") + utils.set_logging_options(parser) return parser.parse_args() - if __name__ == "__main__": options = get_options() + print("the search type is : %s" % options.search_type) + utils.set_logging(options) - - acess_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename, application_name=APPLICATION_NAME) + bearer_token = utils.get_oauth2_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename, application_name=APPLICATION_NAME) + twitter_auth = twitter.OAuth2(options.consumer_key, options.consumer_secret, bearer_token) - t = twitter.Twitter(domain="api.twitter.com", auth=twitter.OAuth(acess_token_key, access_token_secret, options.consumer_key, options.consumer_secret), secure=True) - t.secure = True + t = twitter.Twitter(domain="api.twitter.com", auth=twitter_auth, secure=True) + t.secure = True conn_str = options.conn_str.strip() if not re.match(r"^\w+://.+", conn_str): @@ -104,7 +159,7 @@ results = None print(options.query) - tm = TweetManager(options.query, t) + tm = TweetManager(t, options.query, options.search_type, options.api_env) move_up = 0 @@ -127,7 +182,7 @@ if count_tweet: continue - processor = TwitterProcessorStatus(tweet, None, None, session, None, options.token_filename, logger) + processor = TwitterProcessorStatus(tweet, None, None, session, twitter_auth=twitter_auth, logger=logger) processor.process() session.flush() session.commit() diff -r 184372ec27e2 -r 14a9bed2e3cd script/utils/search_twitter_json.py --- a/script/utils/search_twitter_json.py Wed Jan 02 17:49:19 2019 +0100 +++ b/script/utils/search_twitter_json.py Thu Jan 10 18:36:36 2019 +0100 @@ -130,8 +130,9 @@ acess_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename, application_name=APPLICATION_NAME) + twitter_auth = twitter.OAuth(acess_token_key, access_token_secret, options.consumer_key, options.consumer_secret) - t = twitter.Twitter(domain="api.twitter.com", auth=twitter.OAuth(acess_token_key, access_token_secret, options.consumer_key, options.consumer_secret), secure=True) + t = twitter.Twitter(domain="api.twitter.com", auth=twitter_auth, secure=True) t.secure = True conn_str = options.conn_str.strip() @@ -180,7 +181,7 @@ else: raise - processor = TwitterProcessorStatus(tweet, None, None, session, None, options.token_filename, logger) + processor = TwitterProcessorStatus(tweet, None, None, session, twitter_auth=twitter_auth, logger=logger) processor.process() session.flush() session.commit() diff -r 184372ec27e2 -r 14a9bed2e3cd script/virtualenv/script/res/requirement.txt --- a/script/virtualenv/script/res/requirement.txt Wed Jan 02 17:49:19 2019 +0100 +++ b/script/virtualenv/script/res/requirement.txt Thu Jan 10 18:36:36 2019 +0100 @@ -1,23 +1,26 @@ -anyjson==0.3.3 -blessings==1.6 -cssselect==0.9.1 -docutils==0.12 -httplib2==0.9.2 +astroid==2.1.0 +blessings==1.7 +certifi==2018.11.29 +chardet==3.0.4 +cssselect==1.0.3 +docutils==0.14 +idna==2.8 iri-tweet===0.82.0final0 +isort==4.3.4 +lazy-object-proxy==1.3.1 lockfile==0.12.2 -lxml==3.5.0 -oauth2==1.9.0.post1 -oauthlib==1.0.3 -psycopg2==2.6.1 -pyquery==1.2.11 -python-daemon==2.1.1 -python-dateutil==2.5.0 -pytz==2016.1 -requests==2.9.1 -requests-oauthlib==0.6.1 -selenium==2.53.1 -simplejson==3.8.2 -six==1.10.0 -SQLAlchemy==1.0.12 -twitter==1.17.1 -twitter-text-py==2.0.2 +lxml==4.2.5 +mccabe==0.6.1 +oauthlib==2.1.0 +pylint==2.2.2 +pyquery==1.4.0 +python-daemon==2.2.0 +python-dateutil==2.7.5 +requests==2.21.0 +requests-oauthlib==1.1.0 +six==1.12.0 +SQLAlchemy==1.2.15 +twitter==1.18.0 +twitter-text==3.0 +urllib3==1.24.1 +wrapt==1.10.11 diff -r 184372ec27e2 -r 14a9bed2e3cd web/images/polemictweet_square.png Binary file web/images/polemictweet_square.png has changed