# HG changeset patch # User Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com> # Date 1367945874 -7200 # Node ID 6fc6637d8403fb0ca8678aaa8d43d7637c64bf14 # Parent 503f9a7b7d6c3ef3085187868ac432ab081e0ebf update listener. add support for twitter regulation messages. update virtualenv diff -r 503f9a7b7d6c -r 6fc6637d8403 .pydevproject --- a/.pydevproject Sun Apr 21 21:55:06 2013 +0200 +++ b/.pydevproject Tue May 07 18:57:54 2013 +0200 @@ -3,6 +3,9 @@ python_tl python 2.7 -/tweet_live/script/lib +/tweet_live/script/lib/iri_tweet +/tweet_live/script/stream +/tweet_live/script/rest +/tweet_live/script/utils diff -r 503f9a7b7d6c -r 6fc6637d8403 script/lib/iri_tweet/iri_tweet/models.py --- a/script/lib/iri_tweet/iri_tweet/models.py Sun Apr 21 21:55:06 2013 +0200 +++ b/script/lib/iri_tweet/iri_tweet/models.py Tue May 07 18:57:54 2013 +0200 @@ -1,5 +1,6 @@ from sqlalchemy import (Boolean, Column, Enum, BigInteger, Integer, String, - ForeignKey, DateTime, create_engine) + ForeignKey, DateTime, create_engine, event) +from sqlalchemy.engine import Engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship, sessionmaker import anyjson @@ -11,12 +12,10 @@ Base = declarative_base() APPLICATION_NAME = "IRI_TWITTER" -CONSUMER_KEY = "54ThDZhpEjokcMgHJOMnQA" -CONSUMER_SECRET = "wUoL9UL2T87tfc97R0Dff2EaqRzpJ5XGdmaN2XK3udA" +#CONSUMER_KEY = "54ThDZhpEjokcMgHJOMnQA" +#CONSUMER_SECRET = "wUoL9UL2T87tfc97R0Dff2EaqRzpJ5XGdmaN2XK3udA" ACCESS_TOKEN_KEY = None ACCESS_TOKEN_SECRET = None -#ACCESS_TOKEN_KEY= "47312923-LiNTtz0I18YXMVIrFeTuhmH7bOvYsK6p3Ln2Dc" -#ACCESS_TOKEN_SECRET = "r3LoXVcjImNAElUpWqTu2SG2xCdWFHkva7xeQoncA" def adapt_date(date_str): ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable @@ -82,6 +81,14 @@ 'OK' : 1, 'ERROR' : 2, 'NOT_TWEET': 3, + 'DELETE': 4, + 'SCRUB_GEO': 5, + 'LIMIT': 6, + 'STATUS_WITHHELD': 7, + 'USER_WITHHELD': 8, + 'DISCONNECT': 9, + 'STALL_WARNING': 10, + 'DELETE_PENDING': 4 } __metaclass__ = TweetMeta @@ -90,6 +97,7 @@ ts = Column(DateTime, default=datetime.datetime.utcnow, index=True) tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id')) tweet_source = relationship("TweetSource", backref="logs") + status_id = Column(BigInteger, index=True, nullable=True, default=None) status = Column(Integer) error = Column(String) error_stack = Column(String) @@ -121,7 +129,7 @@ user = relationship("User", backref="tweets") tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id')) tweet_source = relationship("TweetSource", backref="tweet") - entity_list = relationship(Entity, backref='tweet') + entity_list = relationship(Entity, backref='tweet', cascade="all, delete-orphan") received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True) @@ -269,6 +277,14 @@ kwargs_ce = dict((k, v) for k,v in kwargs.items() if (k not in session_argname and k != "create_all")) engine = create_engine(*args, **kwargs_ce) + + if engine.name == "sqlite": + @event.listens_for(Engine, "connect") + def set_sqlite_pragma(dbapi_connection, connection_record): + cursor = dbapi_connection.cursor() + cursor.execute("PRAGMA foreign_keys=ON") + cursor.close() + metadata = Base.metadata kwargs_sm = {'bind': engine} diff -r 503f9a7b7d6c -r 6fc6637d8403 script/lib/iri_tweet/iri_tweet/processor.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/iri_tweet/iri_tweet/processor.py Tue May 07 18:57:54 2013 +0200 @@ -0,0 +1,516 @@ +# -*- coding: utf-8 -*- +''' +Created on Apr 29, 2013 + +@author: ymh +''' +from iri_tweet.models import (User, EntityType, adapt_json, MediaType, Media, + EntityMedia, Hashtag, EntityHashtag, EntityUser, EntityUrl, Url, Entity, Tweet, + TweetSource, TweetLog) +from iri_tweet.utils import (ObjectsBuffer, adapt_fields, fields_adapter, + ObjectBufferProxy, get_oauth_token, clean_keys) +from sqlalchemy.orm import joinedload +import anyjson +import logging +import twitter +import twitter_text + + +class TwitterProcessorException(Exception): + pass + +class TwitterProcessor(object): + def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None): + + if json_dict is None and json_txt is None: + raise TwitterProcessorException("No json") + + if json_dict is None: + self.json_dict = anyjson.deserialize(json_txt) + else: + self.json_dict = json_dict + + if not json_txt: + self.json_txt = anyjson.serialize(json_dict) + else: + self.json_txt = json_txt + + if "id" not in self.json_dict: + raise TwitterProcessorException("No id in json") + + self.source_id = source_id + self.session = session + self.consumer_key = consumer_token[0] + self.consumer_secret = consumer_token[1] + self.token_filename = token_filename + self.access_token = access_token + self.obj_buffer = ObjectsBuffer() + self.user_query_twitter = user_query_twitter + if not logger: + self.logger = logging.getLogger(__name__) + else: + self.logger = logger + + def process(self): + if self.source_id is None: + tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True) + self.source_id = tweet_source.id + self.process_source() + self.obj_buffer.persists(self.session) + + def process_source(self): + raise NotImplementedError() + + def log_info(self): + return "Process tweet %s" % repr(self.__class__) + + +class TwitterProcessorStatus(TwitterProcessor): + + def __get_user(self, user_dict, do_merge): + self.logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable + + user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) + + user_id = user_dict.get("id",None) + user_name = user_dict.get("screen_name", user_dict.get("name", None)) + + if user_id is None and user_name is None: + return None + + user = None + if user_id: + user = self.obj_buffer.get(User, id=user_id) + else: + user = self.obj_buffer.get(User, screen_name=user_name) + + #to do update user id needed + if user is not None: + user_created_at = None + if user.args is not None: + user_created_at = user.args.get('created_at', None) + if user_created_at is None and user_dict.get('created_at', None) is not None and do_merge: + if user.args is None: + user.args = user_dict + else: + user.args.update(user_dict) + return user + + #todo : add methpds to objectbuffer to get buffer user + user_obj = None + if user_id: + user_obj = self.session.query(User).filter(User.id == user_id).first() + else: + user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first() + + #todo update user if needed + if user_obj is not None: + if user_obj.created_at is not None or user_dict.get('created_at', None) is None or not do_merge : + user = ObjectBufferProxy(User, None, None, False, user_obj) + else: + user = self.obj_buffer.add_object(User, None, user_dict, True, user_obj) + return user + + user_created_at = user_dict.get("created_at", None) + + if user_created_at is None and self.user_query_twitter: + + if self.access_token is not None: + acess_token_key, access_token_secret = self.access_token + else: + acess_token_key, access_token_secret = get_oauth_token(consumer_key=self.consumer_key, consumer_secret=self.consumer_secret, token_file_path=self.token_filename) + #TODO pass it as argument + t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, self.consumer_key, self.consumer_secret)) + try: + if user_id: + user_dict = t.users.show(user_id=user_id) + else: + user_dict = t.users.show(screen_name=user_name) + except Exception as e: + self.logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable + self.logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable + return None + + if "id" not in user_dict: + return None + + #TODO filter get, wrap in proxy + user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first() + + if user_obj is not None and not do_merge: + return ObjectBufferProxy(User, None, None, False, user_obj) + else: + return self.obj_buffer.add_object(User, None, user_dict, True) + + def __get_or_create_object(self, klass, filter_by_kwargs, filter_arg, creation_kwargs, must_flush, do_merge): + + obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs) + if obj_proxy is None: + query = self.session.query(klass) + if filter_arg is not None: + query = query.filter(filter_arg) + else: + query = query.filter_by(**filter_by_kwargs) + obj_instance = query.first() + if obj_instance is not None: + if not do_merge: + obj_proxy = ObjectBufferProxy(klass, None, None, False, obj_instance) + else: + obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush, obj_instance) + if obj_proxy is None: + obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush) + return obj_proxy + + + def __process_entity(self, ind, ind_type): + self.logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable + + ind = clean_keys(ind) + + entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False) + + entity_dict = { + "indice_start" : ind["indices"][0], + "indice_end" : ind["indices"][1], + "tweet_id" : self.tweet.id, + "entity_type_id" : entity_type.id, + "source" : adapt_json(ind) + } + + def process_medias(): + + media_id = ind.get('id', None) + if media_id is None: + return None, None + + type_str = ind.get("type", "photo") + media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False) + media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"]) + if "type" in media_ind: + del(media_ind["type"]) + media_ind['type_id'] = media_type.id + media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False) + + entity_dict['media_id'] = media.id + return EntityMedia, entity_dict + + def process_hashtags(): + text = ind.get("text", ind.get("hashtag", None)) + if text is None: + return None, None + ind['text'] = text + hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False) + entity_dict['hashtag_id'] = hashtag.id + return EntityHashtag, entity_dict + + def process_user_mentions(): + user_mention = self.__get_user(ind, False) + if user_mention is None: + entity_dict['user_id'] = None + else: + entity_dict['user_id'] = user_mention.id + return EntityUser, entity_dict + + def process_urls(): + url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False) + entity_dict['url_id'] = url.id + return EntityUrl, entity_dict + + #{'': lambda } + entity_klass, entity_dict = { + 'hashtags': process_hashtags, + 'user_mentions' : process_user_mentions, + 'urls' : process_urls, + 'media': process_medias, + }.get(ind_type, lambda: (Entity, entity_dict))() + + self.logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable + if entity_klass: + self.obj_buffer.add_object(entity_klass, None, entity_dict, False) + + + def __process_twitter_stream(self): + + tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() + if tweet_nb > 0: + return + + ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"]) + + # get or create user + user = self.__get_user(self.json_dict["user"], True) + if user is None: + self.logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable + ts_copy["user_id"] = None + else: + ts_copy["user_id"] = user.id + + del(ts_copy['user']) + ts_copy["tweet_source_id"] = self.source_id + + self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True) + + self.__process_entities() + + + def __process_entities(self): + if "entities" in self.json_dict: + for ind_type, entity_list in self.json_dict["entities"].items(): + for ind in entity_list: + self.__process_entity(ind, ind_type) + else: + + text = self.tweet.text + extractor = twitter_text.Extractor(text) + for ind in extractor.extract_hashtags_with_indices(): + self.__process_entity(ind, "hashtags") + + for ind in extractor.extract_urls_with_indices(): + self.__process_entity(ind, "urls") + + for ind in extractor.extract_mentioned_screen_names_with_indices(): + self.__process_entity(ind, "user_mentions") + + def __process_twitter_rest(self): + tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() + if tweet_nb > 0: + return + + + tweet_fields = { + 'created_at': self.json_dict["created_at"], + 'favorited': False, + 'id': self.json_dict["id"], + 'id_str': self.json_dict["id_str"], + #'in_reply_to_screen_name': ts["to_user"], + 'in_reply_to_user_id': self.json_dict.get("in_reply_to_user_id",None), + 'in_reply_to_user_id_str': self.json_dict.get("in_reply_to_user_id_str", None), + #'place': ts["place"], + 'source': self.json_dict["source"], + 'text': self.json_dict["text"], + 'truncated': False, + 'tweet_source_id' : self.source_id, + } + + #user + + user_fields = { + 'lang' : self.json_dict.get('iso_language_code',None), + 'profile_image_url' : self.json_dict["profile_image_url"], + 'screen_name' : self.json_dict["from_user"], + 'id' : self.json_dict["from_user_id"], + 'id_str' : self.json_dict["from_user_id_str"], + 'name' : self.json_dict['from_user_name'], + } + + user = self.__get_user(user_fields, do_merge=False) + if user is None: + self.logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable + tweet_fields["user_id"] = None + else: + tweet_fields["user_id"] = user.id + + tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) + self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True) + + self.__process_entities() + + + + def process_source(self): + + status_id = self.json_dict["id"] + log = self.session.query(TweetLog).filter(TweetLog.status_id==status_id).first() + if(log): + self.obj_buffer.add_object(TweetLog, log, {'status': TweetLog.TWEET_STATUS['DELETE'], 'status_id': None}) + self.session.query(TweetSource).filter(TweetSource.id==self.source_id).delete() + else: + if "metadata" in self.json_dict: + self.__process_twitter_rest() + else: + self.__process_twitter_stream() + + self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, True) + + def log_info(self): + screen_name = self.json_dict.get("user",{}).get("screen_name","") + return u"Process Tweet from %s : %s" % (screen_name, self.json_dict.get('text',u"")) + + + +class TwitterProcessorDelete(TwitterProcessor): + """ + { + "delete":{ + "status":{ + "id":1234, + "id_str":"1234", + "user_id":3, + "user_id_str":"3" + } + } + } + """ + + def process(self): + + #find tweet + tweet_id = self.json_dict.get('delete',{}).get('status',{}).get('id',None) + if tweet_id: + t = self.session.query(Tweet).options(joinedload(Tweet.tweet_source)).filter(Tweet.id == tweet_id).first() + if t: + tsource = t.tweet_source + self.session.delete(t) + self.session.query(TweetLog).filter(TweetLog.tweet_source_id == tsource.id).delete() + self.session.delete(tsource) + self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['DELETE']}, True) + else: + self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status_id': tweet_id,'status':TweetLog.TWEET_STATUS['DELETE_PENDING']}, True) + + def log_info(self): + status_del = self.json_dict.get('delete', {}).get("status",{}) + return u"Process delete for %s : %s" % (status_del.get('user_id_str',u""), status_del.get('id_str',u"")) + +class TwitterProcessorScrubGeo(TwitterProcessor): + """ + { + "scrub_geo":{ + "user_id":14090452, + "user_id_str":"14090452", + "up_to_status_id":23260136625, + "up_to_status_id_str":"23260136625" + } + } + """ + + def process_source(self): + up_to_status_id = self.json_dict.get("scrub_geo", {}).get("up_to_status_id", None) + if not up_to_status_id: + return + tweets = self.session.query(Tweet).options(joinedload(Tweet.tweet_source)).filter(Tweet.id <= up_to_status_id) + for t in tweets: + self.obj_buffer.add_object(Tweet, t, {'geo': None}) + tsource = t.tweet_source + tsource_dict = anyjson.serialize(tsource.original_json) + if tsource_dict.get("geo", None): + tsource_dict["geo"] = None + self.obj_buffer.add_object(TweetSource, tsource, {'original_json': anyjson.serialize(tsource_dict)}) + self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['SCRUB_GEO']}, True) + + def log_info(self): + return u"Process scrub geo for %s : %s" % (self.json_dict["scrub_geo"].get('user_id_str',u""), self.json_dict["scrub_geo"].get('id_str',u"")) + + +class TwitterProcessorLimit(TwitterProcessor): + """ + { + "limit":{ + "track":1234 + } + } + """ + def process_source(self): + """ + do nothing, just log the information + """ + self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['LIMIT'], 'error':self.json_txt}, True) + + def log_info(self): + return u"Process limit %d " % self.json_dict.get("limit", {}).get('track', 0) + +class TwitterProcessorStatusWithheld(TwitterProcessor): + """ + { + "status_withheld":{ + "id":1234567890, + "user_id":123456, + "withheld_in_countries":["DE", "AR"] + } + } + """ + def process_source(self): + """ + do nothing, just log the information + """ + self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['STATUS_WITHHELD'], 'error':self.json_txt}, True) + + def log_info(self): + status_withheld = self.json_dict.get("status_withheld",{}) + return u"Process status withheld status id %d from user %d in countries %s" %(status_withheld.get("id",0), status_withheld.get("user_id",0), u",".join(status_withheld.get("withheld_in_countries",[]))) + +class TwitterProcessorUserWithheld(TwitterProcessor): + """ + { + "user_withheld":{ + "id":123456, + "withheld_in_countries":["DE","AR"] + } + } + """ + def process_source(self): + """ + do nothing, just log the information + """ + self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['USER_WITHHELD'], 'error':self.json_txt}, True) + + + def log_info(self): + user_withheld = self.json_dict.get("user_withheld", {}) + return u"Process user withheld %d in countries %s" % (user_withheld.get("id",0), u"".join(user_withheld.get("withheld_in_countries",[]))) + +class TwitterProcessorDisconnect(TwitterProcessor): + """ + { + "disconnect":{ + "code": 4, + "stream_name":"< A stream identifier >", + "reason":"< Human readable status message >" + } + } + """ + def process_source(self): + """ + do nothing, just log the information + """ + self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['DISCONNECT'], 'error':self.json_txt}, True) + + def log_info(self): + disconnect = self.json_dict.get("disconnect",{}) + return u"Process disconnect stream %s code %d reason %s" % (disconnect.get("stream_name",""), disconnect.get("code",0), disconnect.get("reason","")) + +class TwitterProcessorStallWarning(TwitterProcessor): + """ + { + "warning":{ + "code":"FALLING_BEHIND", + "message":"Your connection is falling behind and messages are being queued for delivery to you. Your queue is now over 60% full. You will be disconnected when the queue is full.", + "percent_full": 60 + } + } + """ + def process_source(self): + """ + do nothing, just log the information + """ + self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['STALL_WARNING'], 'error':self.json_txt}, True) + + def log_info(self): + warning = self.json_dict.get("warning",{}) + return u"Process stall warning %d%% code %s, message %s" % (warning.get("percent_full",0),warning.get("code",u""), warning.get("message", u"")) + +TWEET_PROCESSOR_MAP = { + 'text': TwitterProcessorStatus, + 'delete': TwitterProcessorDelete, + 'scrub_geo': TwitterProcessorScrubGeo, + 'limit': TwitterProcessorLimit, + 'status_withheld': TwitterProcessorStatusWithheld, + 'user_withheld': TwitterProcessorUserWithheld, + 'disconnect': TwitterProcessorDisconnect, + 'warning': TwitterProcessorStallWarning +} + +def get_processor(tweet_dict): + for processor_key,processor_klass in TWEET_PROCESSOR_MAP.iteritems(): + if processor_key in tweet_dict: + return processor_klass + return None diff -r 503f9a7b7d6c -r 6fc6637d8403 script/lib/iri_tweet/iri_tweet/stream.py --- a/script/lib/iri_tweet/iri_tweet/stream.py Sun Apr 21 21:55:06 2013 +0200 +++ b/script/lib/iri_tweet/iri_tweet/stream.py Tue May 07 18:57:54 2013 +0200 @@ -121,12 +121,11 @@ """ def __init__(self, auth, - catchup=None, raw=False, timeout=-1, url=None, compressed=False, chunk_size=4096, logger=None): + raw=False, timeout=-1, url=None, compressed=False, chunk_size=4096, logger=None): self._conn = None self._rate_ts = None self._rate_cnt = 0 self._auth = auth - self._catchup_count = catchup self.raw_mode = raw self.timeout = timeout self._compressed = compressed @@ -165,8 +164,6 @@ postdata = self._get_post_data() or {} postdata['stall_warnings'] = 'true' - if self._catchup_count: - postdata["count"] = self._catchup_count if self._logger : self._logger.debug("BaseStream init connection url " + repr(self.url)) if self._logger : self._logger.debug("BaseStream init connection headers " + repr(headers)) @@ -204,37 +201,58 @@ self._rate_ts = time.time() def _iter_object(self): + +# for line in self._resp.iter_lines(): +# yield line +# pending = None +# +# for chunk in self._resp.iter_content(chunk_size=self.chunk_size, decode_unicode=None): +# +# if pending is not None: +# chunk = pending + chunk +# lines = chunk.splitlines() +# +# if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]: +# pending = lines.pop() +# else: +# pending = None +# +# for line in lines: +# yield line +# +# if pending is not None: +# yield pending + pending = None has_stopped = False - -# for chunk in iter_content_non_blocking(self._resp, -# max_chunk_size=self.chunk_size, -# decode_unicode=False, -# timeout=self.timeout): + + if self._logger : self._logger.debug("BaseStream _iter_object") + for chunk in self._resp.iter_content( chunk_size=self.chunk_size, - decode_unicode=False): - + decode_unicode=None): + + if self._logger : self._logger.debug("BaseStream _iter_object loop") if self.testmuststop(): has_stopped = True break - + if pending is not None: chunk = pending + chunk lines = chunk.split('\r') - - if chunk and lines[-1] and lines[-1][-1] == chunk[-1]: + + if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]: pending = lines.pop() else: pending = None - + for line in lines: yield line.strip('\n') - + if self.testmuststop(): has_stopped = True break - + if pending is not None: yield pending if has_stopped: @@ -248,11 +266,14 @@ self._init_conn() if self._logger : self._logger.debug("BaseStream __iter__ connected") + for line in self._iter_object(): + if self._logger : self._logger.debug("BaseStream __iter__ line %s " % repr(line)) + if not line: continue - + if (self.raw_mode): tweet = line else: @@ -285,12 +306,12 @@ url = "https://stream.twitter.com/1.1/statuses/filter.json" def __init__(self, auth, follow=None, locations=None, - track=None, catchup=None, url=None, raw=False, timeout=None, compressed=False, chunk_size=4096, logger=None): + track=None, url=None, raw=False, timeout=None, compressed=False, chunk_size=requests.models.ITER_CHUNK_SIZE, logger=None): self._follow = follow self._locations = locations self._track = track # remove follow, locations, track - BaseStream.__init__(self, auth, url=url, raw=raw, catchup=catchup, timeout=timeout, compressed=compressed, chunk_size=chunk_size, logger=logger) + BaseStream.__init__(self, auth, url=url, raw=raw, timeout=timeout, compressed=compressed, chunk_size=chunk_size, logger=logger) def _get_post_data(self): postdata = {} @@ -358,16 +379,14 @@ if self._logger: self._logger.debug("tweet : " + repr(tweet)) self._reconnects = 0 self._retry_wait = 0 - if "warning" in tweet: - if self._logger: self._logger.warning("Tweet warning received : %s" % repr(tweet)) - continue if not tweet.strip(): if self._logger: self._logger.debug("Empty Tweet received : PING") continue yield tweet except requests.exceptions.HTTPError as e: if e.response.status_code == 401: - raise AuthenticationError("Error connecting to %s : %s" % (e.response.url,e.message)) + if self._logger: self._logger.debug("SafeStreamWrapper Connection Error http error on %s : %s" % (e.response.url,e.message)) + raise AuthenticationError("Error connecting to %s : %s : %s - %s" % (e.response.url,e.message, repr(e.response.headers),repr(e.response.text))) if e.response.status_code > 200: self.__process_http_error(e) else: diff -r 503f9a7b7d6c -r 6fc6637d8403 script/lib/iri_tweet/iri_tweet/tests.py --- a/script/lib/iri_tweet/iri_tweet/tests.py Sun Apr 21 21:55:06 2013 +0200 +++ b/script/lib/iri_tweet/iri_tweet/tests.py Tue May 07 18:57:54 2013 +0200 @@ -3,10 +3,14 @@ from sqlalchemy.orm import relationship, backref import unittest #@UnresolvedImport from sqlalchemy.orm import sessionmaker -from iri_tweet.utils import ObjectsBuffer, TwitterProcessor +from iri_tweet.utils import ObjectsBuffer +from iri_tweet.processor import TwitterProcessorStatus from iri_tweet import models import tempfile #@UnresolvedImport import os +import logging + +logger = logging.getLogger(__name__); Base = declarative_base() @@ -129,12 +133,12 @@ def setUp(self): self.engine, self.metadata, sessionMaker = models.setup_database('sqlite:///:memory:', echo=True) self.session = sessionMaker() - file, self.tmpfilepath = tempfile.mkstemp() - os.close(file) + tmpfile, self.tmpfilepath = tempfile.mkstemp() + os.close(tmpfile) def testTwitterProcessor(self): - tp = TwitterProcessor(None, original_json, None, self.session, self.tmpfilepath) + tp = TwitterProcessorStatus(None, original_json, None, self.session, self.tmpfilepath, logger) tp.process() self.session.commit() @@ -154,7 +158,7 @@ def testTwitterProcessorMedia(self): - tp = TwitterProcessor(None, original_json_media, None, self.session, self.tmpfilepath) + tp = TwitterProcessorStatus(None, original_json_media, None, self.session, self.tmpfilepath, logger) tp.process() self.session.commit() @@ -174,7 +178,7 @@ def testTwitterProcessorMediaOthers(self): - tp = TwitterProcessor(None, original_json_media_others, None, self.session, self.tmpfilepath) + tp = TwitterProcessorStatus(None, original_json_media_others, None, self.session, self.tmpfilepath, logger) tp.process() self.session.commit() diff -r 503f9a7b7d6c -r 6fc6637d8403 script/lib/iri_tweet/iri_tweet/utils.py --- a/script/lib/iri_tweet/iri_tweet/utils.py Sun Apr 21 21:55:06 2013 +0200 +++ b/script/lib/iri_tweet/iri_tweet/utils.py Tue May 07 18:57:54 2013 +0200 @@ -1,25 +1,22 @@ -from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, #@UnresolvedImport - EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY,#@UnresolvedImport - ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType, #@UnresolvedImport - Media, EntityMedia, Entity, EntityType) #@UnresolvedImport -from sqlalchemy.sql import select, or_ #@UnresolvedImport -import Queue #@UnresolvedImport -import anyjson #@UnresolvedImport +from models import (Tweet, User, Hashtag, EntityHashtag, APPLICATION_NAME, ACCESS_TOKEN_SECRET, adapt_date, adapt_json, + ACCESS_TOKEN_KEY) +from sqlalchemy.sql import select, or_ +import Queue import codecs import datetime import email.utils import logging import math import os.path +import socket import sys -import twitter.oauth #@UnresolvedImport -import twitter.oauth_dance #@UnresolvedImport -import twitter_text #@UnresolvedImport +import twitter.oauth +import twitter.oauth_dance CACHE_ACCESS_TOKEN = {} -def get_oauth_token(token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET): +def get_oauth_token(consumer_key, consumer_secret, token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME): global CACHE_ACCESS_TOKEN @@ -34,25 +31,27 @@ if res is not None and check_access_token: get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable - t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], CONSUMER_KEY, CONSUMER_SECRET)) + t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], consumer_key, consumer_secret)) status = None try: - status = t.account.rate_limit_status() + status = t.application.rate_limit_status(resources="account") except Exception as e: get_logger().debug("get_oauth_token : error getting rate limit status %s " % repr(e)) get_logger().debug("get_oauth_token : error getting rate limit status %s " % str(e)) status = None get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable - if status is None or status['remaining_hits'] == 0: + if status is None or status.get("resources",{}).get("account",{}).get('/account/verify_credentials',{}).get('remaining',0) == 0: get_logger().debug("get_oauth_token : Problem with status %s" % repr(status)) res = None if res is None: get_logger().debug("get_oauth_token : doing the oauth dance") res = twitter.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path) + CACHE_ACCESS_TOKEN[application_name] = res + get_logger().debug("get_oauth_token : done got %s" % repr(res)) return res def parse_date(date_str): @@ -169,301 +168,6 @@ return proxy return None -class TwitterProcessorException(Exception): - pass - -class TwitterProcessor(object): - - def __init__(self, json_dict, json_txt, source_id, session, access_token=None, token_filename=None, user_query_twitter=False): - - if json_dict is None and json_txt is None: - raise TwitterProcessorException("No json") - - if json_dict is None: - self.json_dict = anyjson.deserialize(json_txt) - else: - self.json_dict = json_dict - - if not json_txt: - self.json_txt = anyjson.serialize(json_dict) - else: - self.json_txt = json_txt - - if "id" not in self.json_dict: - raise TwitterProcessorException("No id in json") - - self.source_id = source_id - self.session = session - self.token_filename = token_filename - self.access_token = access_token - self.obj_buffer = ObjectsBuffer() - self.user_query_twitter = user_query_twitter - - - - def __get_user(self, user_dict, do_merge): - get_logger().debug("Get user : " + repr(user_dict)) #@UndefinedVariable - - user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) - - user_id = user_dict.get("id",None) - user_name = user_dict.get("screen_name", user_dict.get("name", None)) - - if user_id is None and user_name is None: - return None - - user = None - if user_id: - user = self.obj_buffer.get(User, id=user_id) - else: - user = self.obj_buffer.get(User, screen_name=user_name) - - #to do update user id needed - if user is not None: - user_created_at = None - if user.args is not None: - user_created_at = user.args.get('created_at', None) - if user_created_at is None and user_dict.get('created_at', None) is not None and do_merge: - if user.args is None: - user.args = user_dict - else: - user.args.update(user_dict) - return user - - #todo : add methpds to objectbuffer to get buffer user - user_obj = None - if user_id: - user_obj = self.session.query(User).filter(User.id == user_id).first() - else: - user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first() - - #todo update user if needed - if user_obj is not None: - if user_obj.created_at is not None or user_dict.get('created_at', None) is None or not do_merge : - user = ObjectBufferProxy(User, None, None, False, user_obj) - else: - user = self.obj_buffer.add_object(User, None, user_dict, True, user_obj) - return user - - user_created_at = user_dict.get("created_at", None) - - if user_created_at is None and self.user_query_twitter: - - if self.access_token is not None: - acess_token_key, access_token_secret = self.access_token - else: - acess_token_key, access_token_secret = get_oauth_token(self.token_filename) - t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET)) - try: - if user_id: - user_dict = t.users.show(user_id=user_id) - else: - user_dict = t.users.show(screen_name=user_name) - except Exception as e: - get_logger().info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable - get_logger().info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable - return None - - if "id" not in user_dict: - return None - - #TODO filter get, wrap in proxy - user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first() - - if user_obj is not None and not do_merge: - return ObjectBufferProxy(User, None, None, False, user_obj) - else: - return self.obj_buffer.add_object(User, None, user_dict, True) - - def __get_or_create_object(self, klass, filter_by_kwargs, filter_arg, creation_kwargs, must_flush, do_merge): - - obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs) - if obj_proxy is None: - query = self.session.query(klass) - if filter_arg is not None: - query = query.filter(filter_arg) - else: - query = query.filter_by(**filter_by_kwargs) - obj_instance = query.first() - if obj_instance is not None: - if not do_merge: - obj_proxy = ObjectBufferProxy(klass, None, None, False, obj_instance) - else: - obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush, obj_instance) - if obj_proxy is None: - obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush) - return obj_proxy - - - def __process_entity(self, ind, ind_type): - get_logger().debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable - - ind = clean_keys(ind) - - entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False) - - entity_dict = { - "indice_start" : ind["indices"][0], - "indice_end" : ind["indices"][1], - "tweet_id" : self.tweet.id, - "entity_type_id" : entity_type.id, - "source" : adapt_json(ind) - } - - def process_medias(): - - media_id = ind.get('id', None) - if media_id is None: - return None, None - - type_str = ind.get("type", "photo") - media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False) - media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"]) - if "type" in media_ind: - del(media_ind["type"]) - media_ind['type_id'] = media_type.id - media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False) - - entity_dict['media_id'] = media.id - return EntityMedia, entity_dict - - def process_hashtags(): - text = ind.get("text", ind.get("hashtag", None)) - if text is None: - return None, None - ind['text'] = text - hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False) - entity_dict['hashtag_id'] = hashtag.id - return EntityHashtag, entity_dict - - def process_user_mentions(): - user_mention = self.__get_user(ind, False) - if user_mention is None: - entity_dict['user_id'] = None - else: - entity_dict['user_id'] = user_mention.id - return EntityUser, entity_dict - - def process_urls(): - url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False) - entity_dict['url_id'] = url.id - return EntityUrl, entity_dict - - #{'': lambda } - entity_klass, entity_dict = { - 'hashtags': process_hashtags, - 'user_mentions' : process_user_mentions, - 'urls' : process_urls, - 'media': process_medias, - }.get(ind_type, lambda: (Entity, entity_dict))() - - get_logger().debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable - if entity_klass: - self.obj_buffer.add_object(entity_klass, None, entity_dict, False) - - - def __process_twitter_stream(self): - - tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() - if tweet_nb > 0: - return - - ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"]) - - # get or create user - user = self.__get_user(self.json_dict["user"], True) - if user is None: - get_logger().warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable - ts_copy["user_id"] = None - else: - ts_copy["user_id"] = user.id - - del(ts_copy['user']) - ts_copy["tweet_source_id"] = self.source_id - - self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True) - - self.__process_entities() - - - def __process_entities(self): - if "entities" in self.json_dict: - for ind_type, entity_list in self.json_dict["entities"].items(): - for ind in entity_list: - self.__process_entity(ind, ind_type) - else: - - text = self.tweet.text - extractor = twitter_text.Extractor(text) - for ind in extractor.extract_hashtags_with_indices(): - self.__process_entity(ind, "hashtags") - - for ind in extractor.extract_urls_with_indices(): - self.__process_entity(ind, "urls") - - for ind in extractor.extract_mentioned_screen_names_with_indices(): - self.__process_entity(ind, "user_mentions") - - def __process_twitter_rest(self): - tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() - if tweet_nb > 0: - return - - - tweet_fields = { - 'created_at': self.json_dict["created_at"], - 'favorited': False, - 'id': self.json_dict["id"], - 'id_str': self.json_dict["id_str"], - #'in_reply_to_screen_name': ts["to_user"], - 'in_reply_to_user_id': self.json_dict.get("in_reply_to_user_id",None), - 'in_reply_to_user_id_str': self.json_dict.get("in_reply_to_user_id_str", None), - #'place': ts["place"], - 'source': self.json_dict["source"], - 'text': self.json_dict["text"], - 'truncated': False, - 'tweet_source_id' : self.source_id, - } - - #user - - user_fields = { - 'lang' : self.json_dict.get('iso_language_code',None), - 'profile_image_url' : self.json_dict["profile_image_url"], - 'screen_name' : self.json_dict["from_user"], - 'id' : self.json_dict["from_user_id"], - 'id_str' : self.json_dict["from_user_id_str"], - 'name' : self.json_dict['from_user_name'], - } - - user = self.__get_user(user_fields, do_merge=False) - if user is None: - get_logger().warning("USER not found " + repr(user_fields)) #@UndefinedVariable - tweet_fields["user_id"] = None - else: - tweet_fields["user_id"] = user.id - - tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) - self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True) - - self.__process_entities() - - - - def process(self): - - if self.source_id is None: - tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True) - self.source_id = tweet_source.id - - if "metadata" in self.json_dict: - self.__process_twitter_rest() - else: - self.__process_twitter_stream() - - self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, True) - - self.obj_buffer.persists(self.session) def set_logging(options, plogger=None, queue=None): @@ -508,12 +212,12 @@ return logger def set_logging_options(parser): - parser.add_option("-l", "--log", dest="logfile", + parser.add_argument("-l", "--log", dest="logfile", help="log to file", metavar="LOG", default="stderr") - parser.add_option("-v", dest="verbose", action="count", - help="verbose", metavar="VERBOSE", default=0) - parser.add_option("-q", dest="quiet", action="count", - help="quiet", metavar="QUIET", default=0) + parser.add_argument("-v", dest="verbose", action="count", + help="verbose", default=0) + parser.add_argument("-q", dest="quiet", action="count", + help="quiet", default=0) def get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist): @@ -591,8 +295,10 @@ if ei: dummy = self.format(record) # just to get traceback text into record.exc_text record.exc_info = None # not needed any more - if not self.ignore_full or not self.queue.full(): + if not self.ignore_full or (not self.queue.full()): self.queue.put_nowait(record) + except AssertionError: + pass except Queue.Full: if self.ignore_full: pass @@ -626,3 +332,10 @@ return writer +def get_unused_port(): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(('localhost', 0)) + _, port = s.getsockname() + s.close() + return port + diff -r 503f9a7b7d6c -r 6fc6637d8403 script/rest/export_twitter.py --- a/script/rest/export_twitter.py Sun Apr 21 21:55:06 2013 +0200 +++ b/script/rest/export_twitter.py Tue May 07 18:57:54 2013 +0200 @@ -1,16 +1,15 @@ #!/usr/bin/env python # coding=utf-8 -from sqlite3 import * +from sqlite3 import register_adapter, register_converter, connect, Row import datetime, time import email.utils from optparse import OptionParser import os.path -import os -import sys from lxml import etree import uuid import re +import simplejson def parse_date(date_str): ts = email.utils.parsedate_tz(date_str) @@ -20,10 +19,10 @@ return time.mktime(ts.timetuple()) def adapt_geo(geo): - return simplejson.dumps(geo) - + return simplejson.dumps(geo) + def convert_geo(s): - return simplejson.loads(s) + return simplejson.loads(s) register_adapter(datetime.datetime, adapt_datetime) @@ -73,7 +72,7 @@ ts = int(parse_date(options.start_date)) if options.end_date: - te = int(parse_date(options.end_date)) + te = int(parse_date(options.end_date)) else: te = ts + options.duration diff -r 503f9a7b7d6c -r 6fc6637d8403 script/stream/recorder_stream.py --- a/script/stream/recorder_stream.py Sun Apr 21 21:55:06 2013 +0200 +++ b/script/stream/recorder_stream.py Tue May 07 18:57:54 2013 +0200 @@ -1,14 +1,14 @@ -from getpass import getpass +from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer from iri_tweet import models, utils from iri_tweet.models import TweetSource, TweetLog, ProcessEvent -from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, - get_logger) -from optparse import OptionParser +from iri_tweet.processor import get_processor +from multiprocessing import Queue as mQueue, Process, Event from sqlalchemy.exc import OperationalError from sqlalchemy.orm import scoped_session import Queue import StringIO import anyjson +import argparse import datetime import inspect import iri_tweet.stream @@ -21,6 +21,7 @@ import socket import sqlalchemy.schema import sys +import thread import threading import time import traceback @@ -35,7 +36,20 @@ columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count'] # just put it in a sqlite3 tqble -DEFAULT_TIMEOUT = 5 +DEFAULT_TIMEOUT = 3 + +class Requesthandler(BaseHTTPRequestHandler): + + def __init__(self, request, client_address, server): + BaseHTTPRequestHandler.__init__(self, request, client_address, server) + + def do_GET(self): + self.send_response(200) + self.end_headers() + + def log_message(self, format, *args): # @ReservedAssignment + pass + def set_logging(options): loggers = [] @@ -55,19 +69,16 @@ return qlogger def get_auth(options, access_token): - if options.username and options.password: - auth = requests.auth.BasicAuthHandler(options.username, options.password) - else: - consumer_key = models.CONSUMER_KEY - consumer_secret = models.CONSUMER_SECRET - auth = requests_oauthlib.OAuth1(access_token[0], access_token[1], consumer_key, consumer_secret, signature_type='auth_header') + consumer_key = options.consumer_key + consumer_secret = options.consumer_secret + auth = requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=access_token[0], resource_owner_secret=access_token[1], signature_type='auth_header') return auth -def add_process_event(type, args, session_maker): +def add_process_event(event_type, args, session_maker): session = session_maker() try: - evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type) + evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=event_type) session.add(evt) session.commit() finally: @@ -83,6 +94,7 @@ self.options = options self.logger_queue = logger_queue self.stop_event = stop_event + self.consumer_token = (options.consumer_key, options.consumer_secret) self.access_token = access_token super(BaseProcess, self).__init__() @@ -122,16 +134,15 @@ def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): self.track = options.track self.token_filename = options.token_filename - self.catchup = options.catchup self.timeout = options.timeout self.stream = None super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) def __source_stream_iter(self): - - self.logger = set_logging_process(self.options, self.logger_queue) + self.logger.debug("SourceProcess : run ") + self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.access_token)) self.auth = get_auth(self.options, self.access_token) self.logger.debug("SourceProcess : auth set ") @@ -140,8 +151,8 @@ track_list = [k.strip() for k in track_list.split(',')] - self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) - self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger) + self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth))) + self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, chunk_size=512, logger=self.logger) self.logger.debug("SourceProcess : after connecting to stream") self.stream.muststop = lambda: self.stop_event.is_set() @@ -149,11 +160,14 @@ session = self.session_maker() + #import pydevd + #pydevd.settrace(suspend=False) + + try: for tweet in stream_wrapper: if not self.parent_is_alive(): self.stop_event.set() - stop_thread.join(5) sys.exit() self.logger.debug("SourceProcess : tweet " + repr(tweet)) source = TweetSource(original_json=tweet) @@ -193,41 +207,52 @@ def do_run(self): - # import pydevd - # pydevd.settrace(suspend=False) + self.logger = set_logging_process(self.options, self.logger_queue) source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread") source_stream_iter_thread.start() - while not self.stop_event.is_set(): - self.logger.debug("SourceProcess : In while after start") - self.stop_event.wait(DEFAULT_TIMEOUT) - if self.stop_event.is_set() and self.stream: - self.stream.close() - elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive: - self.stop_event.set() + try: + while not self.stop_event.is_set(): + self.logger.debug("SourceProcess : In while after start") + self.stop_event.wait(DEFAULT_TIMEOUT) + except KeyboardInterrupt: + self.stop_event.set() + pass + if self.stop_event.is_set() and self.stream: + self.stream.close() + elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive: + self.stop_event.set() + self.logger.info("SourceProcess : join") source_stream_iter_thread.join(30) -def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger): +def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger): try: if not tweet.strip(): return tweet_obj = anyjson.deserialize(tweet) - if 'text' not in tweet_obj: + processor_klass = get_processor(tweet_obj) + if not processor_klass: tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) session.add(tweet_log) return - screen_name = "" - if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: - screen_name = tweet_obj['user']['screen_name'] - logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) - logger.debug(u"Process_tweet :" + repr(tweet)) - processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user) + processor = processor_klass(json_dict=tweet_obj, + json_txt=tweet, + source_id=source_id, + session=session, + consumer_token=consumer_token, + access_token=access_token, + token_filename=token_filename, + user_query_twitter=twitter_query_user, + logger=logger) + logger.info(processor.log_info()) + logger.debug(u"Process_tweet :" + repr(tweet)) processor.process() + except ValueError as e: message = u"Value Error %s processing tweet %s" % (repr(e), tweet) output = StringIO.StringIO() @@ -274,8 +299,10 @@ except Exception as e: self.logger.debug('Process tweet exception in loop : ' + repr(e)) continue - process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger) + process_tweet(tweet_txt, source_id, session, self.consumer_token, self.access_token, self.twitter_query_user, self.options.token_filename, self.logger) session.commit() + except KeyboardInterrupt: + self.stop_event.set() finally: session.rollback() session.close() @@ -287,15 +314,20 @@ return Session, engine, metadata -def process_leftovers(session, access_token, twitter_query_user, logger): +def process_leftovers(session, consumer_token, access_token, twitter_query_user, token_filename, ask_process_leftovers, logger): sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) + sources_count = sources.count() + if sources_count > 10 and ask_process_leftovers: + resp = raw_input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count) + if resp and resp.strip().lower() == "n": + return + logger.info("Process leftovers, %d tweets to process" % (sources_count)) for src in sources: tweet_txt = src.original_json - process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger) + process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger) session.commit() - # get tweet source that do not match any message @@ -315,38 +347,36 @@ def get_options(): - usage = "usage: %prog [options]" + usage = "usage: %(prog)s [options]" - parser = OptionParser(usage=usage) + parser = argparse.ArgumentParser(usage=usage) - parser.add_option("-f", "--file", dest="conn_str", - help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db") - parser.add_option("-u", "--user", dest="username", - help="Twitter user", metavar="USER", default=None) - parser.add_option("-w", "--password", dest="password", - help="Twitter password", metavar="PASSWORD", default=None) - parser.add_option("-T", "--track", dest="track", - help="Twitter track", metavar="TRACK") - parser.add_option("-n", "--new", dest="new", action="store_true", - help="new database", default=False) - parser.add_option("-D", "--daemon", dest="daemon", action="store_true", - help="launch daemon", default=False) - parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", - help="Token file name") - parser.add_option("-d", "--duration", dest="duration", - help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') - parser.add_option("-N", "--nb-process", dest="process_nb", - help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') - parser.add_option("--url", dest="url", - help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url) - parser.add_option("--query-user", dest="twitter_query_user", action="store_true", - help="Query twitter for users", default=False, metavar="QUERY_USER") - parser.add_option("--catchup", dest="catchup", - help="catchup count for tweets", default=None, metavar="CATCHUP", type='int') - parser.add_option("--timeout", dest="timeout", - help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int') - - + parser.add_argument("-f", "--file", dest="conn_str", + help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db") + parser.add_argument("-T", "--track", dest="track", + help="Twitter track", metavar="TRACK") + parser.add_argument("-k", "--key", dest="consumer_key", + help="Twitter consumer key", metavar="CONSUMER_KEY", required=True) + parser.add_argument("-s", "--secret", dest="consumer_secret", + help="Twitter consumer secret", metavar="CONSUMER_SECRET", required=True) + parser.add_argument("-n", "--new", dest="new", action="store_true", + help="new database", default=False) + parser.add_argument("-D", "--daemon", dest="daemon", action="store_true", + help="launch daemon", default=False) + parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", + help="Token file name") + parser.add_argument("-d", "--duration", dest="duration", + help="Duration of recording in seconds", metavar="DURATION", default= -1, type=int) + parser.add_argument("-N", "--nb-process", dest="process_nb", + help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type=int) + parser.add_argument("--url", dest="url", + help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url) + parser.add_argument("--query-user", dest="twitter_query_user", action="store_true", + help="Query twitter for users", default=False) + parser.add_argument("--timeout", dest="timeout", + help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type=int) + parser.add_argument("--ask-process-leftovers", dest="ask_process_leftovers", action="store_false", + help="ask process leftover", default=True) utils.set_logging_options(parser) @@ -357,14 +387,14 @@ def do_run(options, session_maker): stop_args = {} - - access_token = None - if not options.username or not options.password: - access_token = utils.get_oauth_token(options.token_filename) + + consumer_token = (options.consumer_key, options.consumer_secret) + access_token = utils.get_oauth_token(consumer_key=consumer_token[0], consumer_secret=consumer_token[1], token_file_path=options.token_filename) + session = session_maker() try: - process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger()) + process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger()) session.commit() finally: session.rollback() @@ -378,7 +408,10 @@ stop_event = Event() # workaround for bug on using urllib2 and multiprocessing - req = urllib2.Request('http://localhost') + httpd = HTTPServer(('127.0.0.1',0), Requesthandler) + thread.start_new_thread(httpd.handle_request, ()) + + req = urllib2.Request('http://localhost:%d' % httpd.server_port) conn = None try: conn = urllib2.urlopen(req) @@ -392,7 +425,7 @@ process_engines = [] logger_queues = [] - SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) + SessionProcess, engine_process, _ = get_sessionmaker(conn_str) process_engines.append(engine_process) lqueue = mQueue(50) logger_queues.append(lqueue) @@ -402,7 +435,7 @@ tweet_processes = [] for i in range(options.process_nb - 1): - SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) + SessionProcess, engine_process, _ = get_sessionmaker(conn_str) process_engines.append(engine_process) lqueue = mQueue(50) logger_queues.append(lqueue) @@ -462,21 +495,13 @@ cprocess.terminate() - utils.get_logger().debug("Close queues") - try: - queue.close() - for lqueue in logger_queues: - lqueue.close() - except exception as e: - utils.get_logger().error("error when closing queues %s", repr(e)) - # do nothing - + utils.get_logger().debug("Close queues") if options.process_nb > 1: utils.get_logger().debug("Processing leftovers") session = session_maker() try: - process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger()) + process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger()) session.commit() finally: session.rollback() @@ -484,11 +509,19 @@ for pengine in process_engines: pengine.dispose() + + try: + queue.close() + for lqueue in logger_queues: + lqueue.close() + except Exception as e: + utils.get_logger().error("error when closing queues %s", repr(e)) + # do nothing return stop_args -def main(options, args): +def main(options): global conn_str @@ -513,7 +546,8 @@ Session, engine, metadata = get_sessionmaker(conn_str) if options.new: - check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True) + check_metadata = sqlalchemy.schema.MetaData(bind=engine) + check_metadata.reflect() if len(check_metadata.sorted_tables) > 0: message = "Database %s not empty exiting" % conn_str utils.get_logger().error(message) @@ -528,7 +562,7 @@ stop_args = {} try: - add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session) + add_process_event(event_type="start", args={'options':options.__dict__, 'args': [], 'command_line': sys.argv}, session_maker=Session) stop_args = do_run(options, Session) except Exception as e: utils.get_logger().exception("Error in main thread") @@ -540,7 +574,7 @@ outfile.close() raise finally: - add_process_event(type="shutdown", args=stop_args, session_maker=Session) + add_process_event(event_type="shutdown", args=stop_args, session_maker=Session) utils.get_logger().debug("Done. Exiting. " + repr(stop_args)) @@ -548,15 +582,15 @@ if __name__ == '__main__': - (options, args) = get_options() + options = get_options() loggers = set_logging(options) utils.get_logger().debug("OPTIONS : " + repr(options)) if options.daemon: + options.ask_process_leftovers = False import daemon - import lockfile hdlr_preserve = [] for logger in loggers: @@ -564,7 +598,7 @@ context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) with context: - main(options, args) + main(options) else: - main(options, args) - + main(options) + diff -r 503f9a7b7d6c -r 6fc6637d8403 script/utils/export_twitter_alchemy.py --- a/script/utils/export_twitter_alchemy.py Sun Apr 21 21:55:06 2013 +0200 +++ b/script/utils/export_twitter_alchemy.py Tue May 07 18:57:54 2013 +0200 @@ -416,7 +416,7 @@ get_logger().debug("write http " + repr(project)) #@UndefinedVariable r = requests.put(content_file_write, data=anyjson.dumps(project), headers={'content-type':'application/json'}, params=post_param); get_logger().debug("write http " + repr(r) + " content " + r.text) #@UndefinedVariable - if r.status_code != requests.codes.ok: + if r.status_code != requests.codes.ok: # @UndefinedVariable r.raise_for_status() else: if content_file_write and os.path.exists(content_file_write): diff -r 503f9a7b7d6c -r 6fc6637d8403 script/utils/merge_tweets.py --- a/script/utils/merge_tweets.py Sun Apr 21 21:55:06 2013 +0200 +++ b/script/utils/merge_tweets.py Tue May 07 18:57:54 2013 +0200 @@ -1,12 +1,15 @@ #from models import setup_database from iri_tweet.models import setup_database, TweetSource, Tweet, TweetLog -from iri_tweet.utils import TwitterProcessor, get_oauth_token, show_progress +from iri_tweet.processor import TwitterProcessorStatus +from iri_tweet.utils import get_oauth_token, show_progress +import anyjson import argparse -import sys +import codecs +import logging import re -import anyjson -import math -import codecs +import sys + +logger = logging.getLogger(__name__) def get_option(): @@ -16,6 +19,10 @@ help="log to file", metavar="LOG", default="stderr") parser.add_argument("-v", dest="verbose", action="count", help="verbose", default=0) + parser.add_option("-k", "--key", dest="consumer_key", + help="Twitter consumer key", metavar="CONSUMER_KEY") + parser.add_option("-s", "--secret", dest="consumer_secret", + help="Twitter consumer secret", metavar="CONSUMER_SECRET") parser.add_argument("-q", dest="quiet", action="count", help="quiet", default=0) parser.add_argument("--query-user", dest="query_user", action="store_true", @@ -38,7 +45,7 @@ access_token = None if options.query_user: - access_token = get_oauth_token(options.token_filename) + access_token = get_oauth_token(options.consumer_key, options.consumer_secret, options.token_filename) #open source src_conn_str = options.source[0].strip() @@ -60,7 +67,7 @@ session_src = Session_src() session_tgt = Session_tgt() - count_tw_query = Tweet.__table__.count() + count_tw_query = Tweet.__table__.count() # @UndefinedVariable count_tw = engine_src.scalar(count_tw_query) @@ -83,10 +90,10 @@ tweet_obj = anyjson.deserialize(tweet_source) if 'text' not in tweet_obj: - tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) + tweet_log = TweetLog(tweet_source_id=tweet.tweet_source.id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) session_tgt.add(tweet_log) else: - tp = TwitterProcessor(None, tweet_source, None, session_tgt, access_token, options.token_filename, user_query_twitter=options.query_user) + tp = TwitterProcessorStatus(None, tweet_source, None, session_tgt, access_token, options.token_filename, user_query_twitter=options.query_user, logger=logger) tp.process() session_tgt.flush() @@ -98,9 +105,13 @@ print u"%d new tweet added" % (added) finally: - session_tgt.close() if session_tgt is not None else None - session_src.close() if session_src is not None else None - conn_tgt.close() if conn_tgt is not None else None - conn_src.close() if conn_src is not None else None + if session_tgt is not None: + session_tgt.close() + if session_src is not None: + session_src.close() + if conn_tgt is not None: + conn_tgt.close() + if conn_src is not None: + conn_src.close() \ No newline at end of file diff -r 503f9a7b7d6c -r 6fc6637d8403 script/utils/search_topsy.py --- a/script/utils/search_topsy.py Sun Apr 21 21:55:06 2013 +0200 +++ b/script/utils/search_topsy.py Tue May 07 18:57:54 2013 +0200 @@ -1,17 +1,15 @@ +from blessings import Terminal from iri_tweet import models, utils -from sqlalchemy.orm import sessionmaker -import anyjson -import sqlite3 -import twitter +from iri_tweet.processor import TwitterProcessorStatus +from optparse import OptionParser +import logging +import math import re import requests -from optparse import OptionParser -import simplejson import time -from blessings import Terminal -import sys -import math -from symbol import except_clause +import twitter + +logger = logging.getLogger(__name__) APPLICATION_NAME = "Tweet recorder user" CONSUMER_KEY = "Vdr5ZcsjI1G3esTPI8yDg" @@ -86,7 +84,7 @@ utils.set_logging(options); - acess_token_key, access_token_secret = utils.get_oauth_token(options.token_filename, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET) + acess_token_key, access_token_secret = utils.get_oauth_token(consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET, options.token_filename, application_name=APPLICATION_NAME) t = twitter.Twitter(domain="api.twitter.com", auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET), secure=True) t.secure = True @@ -144,7 +142,7 @@ else: raise - processor = utils.TwitterProcessor(tweet, None, None, session, None, options.token_filename) + processor = TwitterProcessorStatus(tweet, None, None, session, None, options.token_filename, logger) processor.process() session.flush() session.commit() diff -r 503f9a7b7d6c -r 6fc6637d8403 script/utils/tweet_twitter_user.py --- a/script/utils/tweet_twitter_user.py Sun Apr 21 21:55:06 2013 +0200 +++ b/script/utils/tweet_twitter_user.py Tue May 07 18:57:54 2013 +0200 @@ -104,7 +104,7 @@ query_res = query.all() - acess_token_key, access_token_secret = get_oauth_token(options.token_filename, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET) + acess_token_key, access_token_secret = get_oauth_token(consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET, token_file_path=options.token_filename, application_name=APPLICATION_NAME) t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET)) for user in query_res: diff -r 503f9a7b7d6c -r 6fc6637d8403 script/virtualenv/res/lib/lib_create_env.py --- a/script/virtualenv/res/lib/lib_create_env.py Sun Apr 21 21:55:06 2013 +0200 +++ b/script/virtualenv/res/lib/lib_create_env.py Tue May 07 18:57:54 2013 +0200 @@ -25,12 +25,13 @@ 'DATEUTIL': {'setup': 'python-dateutil', 'url':'http://pypi.python.org/packages/source/p/python-dateutil/python-dateutil-2.1.tar.gz', 'local':"python-dateutil-2.1.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}}, 'PYTZ': {'setup': 'pytz', 'url':'http://pypi.python.org/packages/source/p/pytz/pytz-2013b.tar.bz2', 'local':"pytz-2013b.tar.bz2", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}}, 'SIMPLEJSON': {'setup': 'simplejson', 'url':'http://pypi.python.org/packages/source/s/simplejson/simplejson-3.1.3.tar.gz', 'local':"simplejson-3.1.3.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}}, - 'SQLALCHEMY': {'setup': 'sqlalchemy', 'url':'http://www.python.org/pypi/SQLAlchemy/0.8.0', 'local':"SQLAlchemy-0.8.0.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}}, + 'SQLALCHEMY': {'setup': 'sqlalchemy', 'url':'http://www.python.org/pypi/SQLAlchemy/0.8.1', 'local':"SQLAlchemy-0.8.1.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}}, 'TWITTER': {'setup': 'twitter', 'url':'http://pypi.python.org/packages/source/t/twitter/twitter-1.9.2.tar.gz', 'local':"twitter-1.9.2.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}}, 'TWITTER-TEXT': {'setup': 'twitter-text', 'url':'https://github.com/dryan/twitter-text-py/archive/master.tar.gz', 'local':"twitter-text-1.0.4.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}}, 'REQUESTS': {'setup': 'requests', 'url':'https://github.com/kennethreitz/requests/archive/v1.2.0.tar.gz', 'local':'requests-v1.2.0.tar.gz', 'install' : {'method':'pip', 'option_str': None, 'dict_extra_env': None}}, 'OAUTHLIB': {'setup': 'oauthlib', 'url':'https://github.com/idan/oauthlib/archive/0.4.0.tar.gz', 'local':'oauthlib-0.4.0.tar.gz', 'install' : {'method':'pip', 'option_str': None, 'dict_extra_env': None}}, 'REQUESTS-OAUTHLIB': {'setup': 'requests-oauthlib', 'url':'https://github.com/requests/requests-oauthlib/archive/master.tar.gz', 'local':'requests-oauthlib-0.3.0.tar.gz', 'install' : {'method':'pip', 'option_str': None, 'dict_extra_env': None}}, + 'BLESSINGS': {'setup': 'blessings', 'url':'https://github.com/erikrose/blessings/archive/1.5.tar.gz', 'local':'blessings-1.5.tar.gz', 'install' : {'method':'pip', 'option_str': None, 'dict_extra_env': None}} } if system_str == 'Windows': diff -r 503f9a7b7d6c -r 6fc6637d8403 script/virtualenv/res/src/SQLAlchemy-0.8.0.tar.gz Binary file script/virtualenv/res/src/SQLAlchemy-0.8.0.tar.gz has changed diff -r 503f9a7b7d6c -r 6fc6637d8403 script/virtualenv/res/src/SQLAlchemy-0.8.1.tar.gz Binary file script/virtualenv/res/src/SQLAlchemy-0.8.1.tar.gz has changed diff -r 503f9a7b7d6c -r 6fc6637d8403 script/virtualenv/res/src/blessings-1.5.tar.gz Binary file script/virtualenv/res/src/blessings-1.5.tar.gz has changed diff -r 503f9a7b7d6c -r 6fc6637d8403 script/virtualenv/script/res/res_create_env.py --- a/script/virtualenv/script/res/res_create_env.py Sun Apr 21 21:55:06 2013 +0200 +++ b/script/virtualenv/script/res/res_create_env.py Tue May 07 18:57:54 2013 +0200 @@ -21,7 +21,8 @@ 'TWITTER-TEXT', 'REQUESTS', 'OAUTHLIB', - 'REQUESTS-OAUTHLIB' + 'REQUESTS-OAUTHLIB', + 'BLESSINGS' ] if system_str == "Linux":