update listener. add support for twitter regulation messages. update virtualenv
--- a/.pydevproject Sun Apr 21 21:55:06 2013 +0200
+++ b/.pydevproject Tue May 07 18:57:54 2013 +0200
@@ -3,6 +3,9 @@
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">python_tl</pydev_property>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python 2.7</pydev_property>
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
-<path>/tweet_live/script/lib</path>
+<path>/tweet_live/script/lib/iri_tweet</path>
+<path>/tweet_live/script/stream</path>
+<path>/tweet_live/script/rest</path>
+<path>/tweet_live/script/utils</path>
</pydev_pathproperty>
</pydev_project>
--- a/script/lib/iri_tweet/iri_tweet/models.py Sun Apr 21 21:55:06 2013 +0200
+++ b/script/lib/iri_tweet/iri_tweet/models.py Tue May 07 18:57:54 2013 +0200
@@ -1,5 +1,6 @@
from sqlalchemy import (Boolean, Column, Enum, BigInteger, Integer, String,
- ForeignKey, DateTime, create_engine)
+ ForeignKey, DateTime, create_engine, event)
+from sqlalchemy.engine import Engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
import anyjson
@@ -11,12 +12,10 @@
Base = declarative_base()
APPLICATION_NAME = "IRI_TWITTER"
-CONSUMER_KEY = "54ThDZhpEjokcMgHJOMnQA"
-CONSUMER_SECRET = "wUoL9UL2T87tfc97R0Dff2EaqRzpJ5XGdmaN2XK3udA"
+#CONSUMER_KEY = "54ThDZhpEjokcMgHJOMnQA"
+#CONSUMER_SECRET = "wUoL9UL2T87tfc97R0Dff2EaqRzpJ5XGdmaN2XK3udA"
ACCESS_TOKEN_KEY = None
ACCESS_TOKEN_SECRET = None
-#ACCESS_TOKEN_KEY= "47312923-LiNTtz0I18YXMVIrFeTuhmH7bOvYsK6p3Ln2Dc"
-#ACCESS_TOKEN_SECRET = "r3LoXVcjImNAElUpWqTu2SG2xCdWFHkva7xeQoncA"
def adapt_date(date_str):
ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable
@@ -82,6 +81,14 @@
'OK' : 1,
'ERROR' : 2,
'NOT_TWEET': 3,
+ 'DELETE': 4,
+ 'SCRUB_GEO': 5,
+ 'LIMIT': 6,
+ 'STATUS_WITHHELD': 7,
+ 'USER_WITHHELD': 8,
+ 'DISCONNECT': 9,
+ 'STALL_WARNING': 10,
+ 'DELETE_PENDING': 4
}
__metaclass__ = TweetMeta
@@ -90,6 +97,7 @@
ts = Column(DateTime, default=datetime.datetime.utcnow, index=True)
tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
tweet_source = relationship("TweetSource", backref="logs")
+ status_id = Column(BigInteger, index=True, nullable=True, default=None)
status = Column(Integer)
error = Column(String)
error_stack = Column(String)
@@ -121,7 +129,7 @@
user = relationship("User", backref="tweets")
tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
tweet_source = relationship("TweetSource", backref="tweet")
- entity_list = relationship(Entity, backref='tweet')
+ entity_list = relationship(Entity, backref='tweet', cascade="all, delete-orphan")
received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True)
@@ -269,6 +277,14 @@
kwargs_ce = dict((k, v) for k,v in kwargs.items() if (k not in session_argname and k != "create_all"))
engine = create_engine(*args, **kwargs_ce)
+
+ if engine.name == "sqlite":
+ @event.listens_for(Engine, "connect")
+ def set_sqlite_pragma(dbapi_connection, connection_record):
+ cursor = dbapi_connection.cursor()
+ cursor.execute("PRAGMA foreign_keys=ON")
+ cursor.close()
+
metadata = Base.metadata
kwargs_sm = {'bind': engine}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/iri_tweet/iri_tweet/processor.py Tue May 07 18:57:54 2013 +0200
@@ -0,0 +1,516 @@
+# -*- coding: utf-8 -*-
+'''
+Created on Apr 29, 2013
+
+@author: ymh
+'''
+from iri_tweet.models import (User, EntityType, adapt_json, MediaType, Media,
+ EntityMedia, Hashtag, EntityHashtag, EntityUser, EntityUrl, Url, Entity, Tweet,
+ TweetSource, TweetLog)
+from iri_tweet.utils import (ObjectsBuffer, adapt_fields, fields_adapter,
+ ObjectBufferProxy, get_oauth_token, clean_keys)
+from sqlalchemy.orm import joinedload
+import anyjson
+import logging
+import twitter
+import twitter_text
+
+
+class TwitterProcessorException(Exception):
+ pass
+
+class TwitterProcessor(object):
+ def __init__(self, json_dict, json_txt, source_id, session, consumer_token, access_token=None, token_filename=None, user_query_twitter=False, logger=None):
+
+ if json_dict is None and json_txt is None:
+ raise TwitterProcessorException("No json")
+
+ if json_dict is None:
+ self.json_dict = anyjson.deserialize(json_txt)
+ else:
+ self.json_dict = json_dict
+
+ if not json_txt:
+ self.json_txt = anyjson.serialize(json_dict)
+ else:
+ self.json_txt = json_txt
+
+ if "id" not in self.json_dict:
+ raise TwitterProcessorException("No id in json")
+
+ self.source_id = source_id
+ self.session = session
+ self.consumer_key = consumer_token[0]
+ self.consumer_secret = consumer_token[1]
+ self.token_filename = token_filename
+ self.access_token = access_token
+ self.obj_buffer = ObjectsBuffer()
+ self.user_query_twitter = user_query_twitter
+ if not logger:
+ self.logger = logging.getLogger(__name__)
+ else:
+ self.logger = logger
+
+ def process(self):
+ if self.source_id is None:
+ tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True)
+ self.source_id = tweet_source.id
+ self.process_source()
+ self.obj_buffer.persists(self.session)
+
+ def process_source(self):
+ raise NotImplementedError()
+
+ def log_info(self):
+ return "Process tweet %s" % repr(self.__class__)
+
+
+class TwitterProcessorStatus(TwitterProcessor):
+
+ def __get_user(self, user_dict, do_merge):
+ self.logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
+
+ user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
+
+ user_id = user_dict.get("id",None)
+ user_name = user_dict.get("screen_name", user_dict.get("name", None))
+
+ if user_id is None and user_name is None:
+ return None
+
+ user = None
+ if user_id:
+ user = self.obj_buffer.get(User, id=user_id)
+ else:
+ user = self.obj_buffer.get(User, screen_name=user_name)
+
+ #to do update user id needed
+ if user is not None:
+ user_created_at = None
+ if user.args is not None:
+ user_created_at = user.args.get('created_at', None)
+ if user_created_at is None and user_dict.get('created_at', None) is not None and do_merge:
+ if user.args is None:
+ user.args = user_dict
+ else:
+ user.args.update(user_dict)
+ return user
+
+ #todo : add methpds to objectbuffer to get buffer user
+ user_obj = None
+ if user_id:
+ user_obj = self.session.query(User).filter(User.id == user_id).first()
+ else:
+ user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first()
+
+ #todo update user if needed
+ if user_obj is not None:
+ if user_obj.created_at is not None or user_dict.get('created_at', None) is None or not do_merge :
+ user = ObjectBufferProxy(User, None, None, False, user_obj)
+ else:
+ user = self.obj_buffer.add_object(User, None, user_dict, True, user_obj)
+ return user
+
+ user_created_at = user_dict.get("created_at", None)
+
+ if user_created_at is None and self.user_query_twitter:
+
+ if self.access_token is not None:
+ acess_token_key, access_token_secret = self.access_token
+ else:
+ acess_token_key, access_token_secret = get_oauth_token(consumer_key=self.consumer_key, consumer_secret=self.consumer_secret, token_file_path=self.token_filename)
+ #TODO pass it as argument
+ t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, self.consumer_key, self.consumer_secret))
+ try:
+ if user_id:
+ user_dict = t.users.show(user_id=user_id)
+ else:
+ user_dict = t.users.show(screen_name=user_name)
+ except Exception as e:
+ self.logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
+ self.logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
+ return None
+
+ if "id" not in user_dict:
+ return None
+
+ #TODO filter get, wrap in proxy
+ user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first()
+
+ if user_obj is not None and not do_merge:
+ return ObjectBufferProxy(User, None, None, False, user_obj)
+ else:
+ return self.obj_buffer.add_object(User, None, user_dict, True)
+
+ def __get_or_create_object(self, klass, filter_by_kwargs, filter_arg, creation_kwargs, must_flush, do_merge):
+
+ obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs)
+ if obj_proxy is None:
+ query = self.session.query(klass)
+ if filter_arg is not None:
+ query = query.filter(filter_arg)
+ else:
+ query = query.filter_by(**filter_by_kwargs)
+ obj_instance = query.first()
+ if obj_instance is not None:
+ if not do_merge:
+ obj_proxy = ObjectBufferProxy(klass, None, None, False, obj_instance)
+ else:
+ obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush, obj_instance)
+ if obj_proxy is None:
+ obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush)
+ return obj_proxy
+
+
+ def __process_entity(self, ind, ind_type):
+ self.logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
+
+ ind = clean_keys(ind)
+
+ entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False)
+
+ entity_dict = {
+ "indice_start" : ind["indices"][0],
+ "indice_end" : ind["indices"][1],
+ "tweet_id" : self.tweet.id,
+ "entity_type_id" : entity_type.id,
+ "source" : adapt_json(ind)
+ }
+
+ def process_medias():
+
+ media_id = ind.get('id', None)
+ if media_id is None:
+ return None, None
+
+ type_str = ind.get("type", "photo")
+ media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False)
+ media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"])
+ if "type" in media_ind:
+ del(media_ind["type"])
+ media_ind['type_id'] = media_type.id
+ media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False)
+
+ entity_dict['media_id'] = media.id
+ return EntityMedia, entity_dict
+
+ def process_hashtags():
+ text = ind.get("text", ind.get("hashtag", None))
+ if text is None:
+ return None, None
+ ind['text'] = text
+ hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False)
+ entity_dict['hashtag_id'] = hashtag.id
+ return EntityHashtag, entity_dict
+
+ def process_user_mentions():
+ user_mention = self.__get_user(ind, False)
+ if user_mention is None:
+ entity_dict['user_id'] = None
+ else:
+ entity_dict['user_id'] = user_mention.id
+ return EntityUser, entity_dict
+
+ def process_urls():
+ url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False)
+ entity_dict['url_id'] = url.id
+ return EntityUrl, entity_dict
+
+ #{'': lambda }
+ entity_klass, entity_dict = {
+ 'hashtags': process_hashtags,
+ 'user_mentions' : process_user_mentions,
+ 'urls' : process_urls,
+ 'media': process_medias,
+ }.get(ind_type, lambda: (Entity, entity_dict))()
+
+ self.logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
+ if entity_klass:
+ self.obj_buffer.add_object(entity_klass, None, entity_dict, False)
+
+
+ def __process_twitter_stream(self):
+
+ tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
+ if tweet_nb > 0:
+ return
+
+ ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
+
+ # get or create user
+ user = self.__get_user(self.json_dict["user"], True)
+ if user is None:
+ self.logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
+ ts_copy["user_id"] = None
+ else:
+ ts_copy["user_id"] = user.id
+
+ del(ts_copy['user'])
+ ts_copy["tweet_source_id"] = self.source_id
+
+ self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True)
+
+ self.__process_entities()
+
+
+ def __process_entities(self):
+ if "entities" in self.json_dict:
+ for ind_type, entity_list in self.json_dict["entities"].items():
+ for ind in entity_list:
+ self.__process_entity(ind, ind_type)
+ else:
+
+ text = self.tweet.text
+ extractor = twitter_text.Extractor(text)
+ for ind in extractor.extract_hashtags_with_indices():
+ self.__process_entity(ind, "hashtags")
+
+ for ind in extractor.extract_urls_with_indices():
+ self.__process_entity(ind, "urls")
+
+ for ind in extractor.extract_mentioned_screen_names_with_indices():
+ self.__process_entity(ind, "user_mentions")
+
+ def __process_twitter_rest(self):
+ tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
+ if tweet_nb > 0:
+ return
+
+
+ tweet_fields = {
+ 'created_at': self.json_dict["created_at"],
+ 'favorited': False,
+ 'id': self.json_dict["id"],
+ 'id_str': self.json_dict["id_str"],
+ #'in_reply_to_screen_name': ts["to_user"],
+ 'in_reply_to_user_id': self.json_dict.get("in_reply_to_user_id",None),
+ 'in_reply_to_user_id_str': self.json_dict.get("in_reply_to_user_id_str", None),
+ #'place': ts["place"],
+ 'source': self.json_dict["source"],
+ 'text': self.json_dict["text"],
+ 'truncated': False,
+ 'tweet_source_id' : self.source_id,
+ }
+
+ #user
+
+ user_fields = {
+ 'lang' : self.json_dict.get('iso_language_code',None),
+ 'profile_image_url' : self.json_dict["profile_image_url"],
+ 'screen_name' : self.json_dict["from_user"],
+ 'id' : self.json_dict["from_user_id"],
+ 'id_str' : self.json_dict["from_user_id_str"],
+ 'name' : self.json_dict['from_user_name'],
+ }
+
+ user = self.__get_user(user_fields, do_merge=False)
+ if user is None:
+ self.logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
+ tweet_fields["user_id"] = None
+ else:
+ tweet_fields["user_id"] = user.id
+
+ tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
+ self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True)
+
+ self.__process_entities()
+
+
+
+ def process_source(self):
+
+ status_id = self.json_dict["id"]
+ log = self.session.query(TweetLog).filter(TweetLog.status_id==status_id).first()
+ if(log):
+ self.obj_buffer.add_object(TweetLog, log, {'status': TweetLog.TWEET_STATUS['DELETE'], 'status_id': None})
+ self.session.query(TweetSource).filter(TweetSource.id==self.source_id).delete()
+ else:
+ if "metadata" in self.json_dict:
+ self.__process_twitter_rest()
+ else:
+ self.__process_twitter_stream()
+
+ self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, True)
+
+ def log_info(self):
+ screen_name = self.json_dict.get("user",{}).get("screen_name","")
+ return u"Process Tweet from %s : %s" % (screen_name, self.json_dict.get('text',u""))
+
+
+
+class TwitterProcessorDelete(TwitterProcessor):
+ """
+ {
+ "delete":{
+ "status":{
+ "id":1234,
+ "id_str":"1234",
+ "user_id":3,
+ "user_id_str":"3"
+ }
+ }
+ }
+ """
+
+ def process(self):
+
+ #find tweet
+ tweet_id = self.json_dict.get('delete',{}).get('status',{}).get('id',None)
+ if tweet_id:
+ t = self.session.query(Tweet).options(joinedload(Tweet.tweet_source)).filter(Tweet.id == tweet_id).first()
+ if t:
+ tsource = t.tweet_source
+ self.session.delete(t)
+ self.session.query(TweetLog).filter(TweetLog.tweet_source_id == tsource.id).delete()
+ self.session.delete(tsource)
+ self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['DELETE']}, True)
+ else:
+ self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status_id': tweet_id,'status':TweetLog.TWEET_STATUS['DELETE_PENDING']}, True)
+
+ def log_info(self):
+ status_del = self.json_dict.get('delete', {}).get("status",{})
+ return u"Process delete for %s : %s" % (status_del.get('user_id_str',u""), status_del.get('id_str',u""))
+
+class TwitterProcessorScrubGeo(TwitterProcessor):
+ """
+ {
+ "scrub_geo":{
+ "user_id":14090452,
+ "user_id_str":"14090452",
+ "up_to_status_id":23260136625,
+ "up_to_status_id_str":"23260136625"
+ }
+ }
+ """
+
+ def process_source(self):
+ up_to_status_id = self.json_dict.get("scrub_geo", {}).get("up_to_status_id", None)
+ if not up_to_status_id:
+ return
+ tweets = self.session.query(Tweet).options(joinedload(Tweet.tweet_source)).filter(Tweet.id <= up_to_status_id)
+ for t in tweets:
+ self.obj_buffer.add_object(Tweet, t, {'geo': None})
+ tsource = t.tweet_source
+ tsource_dict = anyjson.serialize(tsource.original_json)
+ if tsource_dict.get("geo", None):
+ tsource_dict["geo"] = None
+ self.obj_buffer.add_object(TweetSource, tsource, {'original_json': anyjson.serialize(tsource_dict)})
+ self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['SCRUB_GEO']}, True)
+
+ def log_info(self):
+ return u"Process scrub geo for %s : %s" % (self.json_dict["scrub_geo"].get('user_id_str',u""), self.json_dict["scrub_geo"].get('id_str',u""))
+
+
+class TwitterProcessorLimit(TwitterProcessor):
+ """
+ {
+ "limit":{
+ "track":1234
+ }
+ }
+ """
+ def process_source(self):
+ """
+ do nothing, just log the information
+ """
+ self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['LIMIT'], 'error':self.json_txt}, True)
+
+ def log_info(self):
+ return u"Process limit %d " % self.json_dict.get("limit", {}).get('track', 0)
+
+class TwitterProcessorStatusWithheld(TwitterProcessor):
+ """
+ {
+ "status_withheld":{
+ "id":1234567890,
+ "user_id":123456,
+ "withheld_in_countries":["DE", "AR"]
+ }
+ }
+ """
+ def process_source(self):
+ """
+ do nothing, just log the information
+ """
+ self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['STATUS_WITHHELD'], 'error':self.json_txt}, True)
+
+ def log_info(self):
+ status_withheld = self.json_dict.get("status_withheld",{})
+ return u"Process status withheld status id %d from user %d in countries %s" %(status_withheld.get("id",0), status_withheld.get("user_id",0), u",".join(status_withheld.get("withheld_in_countries",[])))
+
+class TwitterProcessorUserWithheld(TwitterProcessor):
+ """
+ {
+ "user_withheld":{
+ "id":123456,
+ "withheld_in_countries":["DE","AR"]
+ }
+ }
+ """
+ def process_source(self):
+ """
+ do nothing, just log the information
+ """
+ self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['USER_WITHHELD'], 'error':self.json_txt}, True)
+
+
+ def log_info(self):
+ user_withheld = self.json_dict.get("user_withheld", {})
+ return u"Process user withheld %d in countries %s" % (user_withheld.get("id",0), u"".join(user_withheld.get("withheld_in_countries",[])))
+
+class TwitterProcessorDisconnect(TwitterProcessor):
+ """
+ {
+ "disconnect":{
+ "code": 4,
+ "stream_name":"< A stream identifier >",
+ "reason":"< Human readable status message >"
+ }
+ }
+ """
+ def process_source(self):
+ """
+ do nothing, just log the information
+ """
+ self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['DISCONNECT'], 'error':self.json_txt}, True)
+
+ def log_info(self):
+ disconnect = self.json_dict.get("disconnect",{})
+ return u"Process disconnect stream %s code %d reason %s" % (disconnect.get("stream_name",""), disconnect.get("code",0), disconnect.get("reason",""))
+
+class TwitterProcessorStallWarning(TwitterProcessor):
+ """
+ {
+ "warning":{
+ "code":"FALLING_BEHIND",
+ "message":"Your connection is falling behind and messages are being queued for delivery to you. Your queue is now over 60% full. You will be disconnected when the queue is full.",
+ "percent_full": 60
+ }
+ }
+ """
+ def process_source(self):
+ """
+ do nothing, just log the information
+ """
+ self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['STALL_WARNING'], 'error':self.json_txt}, True)
+
+ def log_info(self):
+ warning = self.json_dict.get("warning",{})
+ return u"Process stall warning %d%% code %s, message %s" % (warning.get("percent_full",0),warning.get("code",u""), warning.get("message", u""))
+
+TWEET_PROCESSOR_MAP = {
+ 'text': TwitterProcessorStatus,
+ 'delete': TwitterProcessorDelete,
+ 'scrub_geo': TwitterProcessorScrubGeo,
+ 'limit': TwitterProcessorLimit,
+ 'status_withheld': TwitterProcessorStatusWithheld,
+ 'user_withheld': TwitterProcessorUserWithheld,
+ 'disconnect': TwitterProcessorDisconnect,
+ 'warning': TwitterProcessorStallWarning
+}
+
+def get_processor(tweet_dict):
+ for processor_key,processor_klass in TWEET_PROCESSOR_MAP.iteritems():
+ if processor_key in tweet_dict:
+ return processor_klass
+ return None
--- a/script/lib/iri_tweet/iri_tweet/stream.py Sun Apr 21 21:55:06 2013 +0200
+++ b/script/lib/iri_tweet/iri_tweet/stream.py Tue May 07 18:57:54 2013 +0200
@@ -121,12 +121,11 @@
"""
def __init__(self, auth,
- catchup=None, raw=False, timeout=-1, url=None, compressed=False, chunk_size=4096, logger=None):
+ raw=False, timeout=-1, url=None, compressed=False, chunk_size=4096, logger=None):
self._conn = None
self._rate_ts = None
self._rate_cnt = 0
self._auth = auth
- self._catchup_count = catchup
self.raw_mode = raw
self.timeout = timeout
self._compressed = compressed
@@ -165,8 +164,6 @@
postdata = self._get_post_data() or {}
postdata['stall_warnings'] = 'true'
- if self._catchup_count:
- postdata["count"] = self._catchup_count
if self._logger : self._logger.debug("BaseStream init connection url " + repr(self.url))
if self._logger : self._logger.debug("BaseStream init connection headers " + repr(headers))
@@ -204,37 +201,58 @@
self._rate_ts = time.time()
def _iter_object(self):
+
+# for line in self._resp.iter_lines():
+# yield line
+# pending = None
+#
+# for chunk in self._resp.iter_content(chunk_size=self.chunk_size, decode_unicode=None):
+#
+# if pending is not None:
+# chunk = pending + chunk
+# lines = chunk.splitlines()
+#
+# if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
+# pending = lines.pop()
+# else:
+# pending = None
+#
+# for line in lines:
+# yield line
+#
+# if pending is not None:
+# yield pending
+
pending = None
has_stopped = False
-
-# for chunk in iter_content_non_blocking(self._resp,
-# max_chunk_size=self.chunk_size,
-# decode_unicode=False,
-# timeout=self.timeout):
+
+ if self._logger : self._logger.debug("BaseStream _iter_object")
+
for chunk in self._resp.iter_content(
chunk_size=self.chunk_size,
- decode_unicode=False):
-
+ decode_unicode=None):
+
+ if self._logger : self._logger.debug("BaseStream _iter_object loop")
if self.testmuststop():
has_stopped = True
break
-
+
if pending is not None:
chunk = pending + chunk
lines = chunk.split('\r')
-
- if chunk and lines[-1] and lines[-1][-1] == chunk[-1]:
+
+ if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
pending = lines.pop()
else:
pending = None
-
+
for line in lines:
yield line.strip('\n')
-
+
if self.testmuststop():
has_stopped = True
break
-
+
if pending is not None:
yield pending
if has_stopped:
@@ -248,11 +266,14 @@
self._init_conn()
if self._logger : self._logger.debug("BaseStream __iter__ connected")
+
for line in self._iter_object():
+ if self._logger : self._logger.debug("BaseStream __iter__ line %s " % repr(line))
+
if not line:
continue
-
+
if (self.raw_mode):
tweet = line
else:
@@ -285,12 +306,12 @@
url = "https://stream.twitter.com/1.1/statuses/filter.json"
def __init__(self, auth, follow=None, locations=None,
- track=None, catchup=None, url=None, raw=False, timeout=None, compressed=False, chunk_size=4096, logger=None):
+ track=None, url=None, raw=False, timeout=None, compressed=False, chunk_size=requests.models.ITER_CHUNK_SIZE, logger=None):
self._follow = follow
self._locations = locations
self._track = track
# remove follow, locations, track
- BaseStream.__init__(self, auth, url=url, raw=raw, catchup=catchup, timeout=timeout, compressed=compressed, chunk_size=chunk_size, logger=logger)
+ BaseStream.__init__(self, auth, url=url, raw=raw, timeout=timeout, compressed=compressed, chunk_size=chunk_size, logger=logger)
def _get_post_data(self):
postdata = {}
@@ -358,16 +379,14 @@
if self._logger: self._logger.debug("tweet : " + repr(tweet))
self._reconnects = 0
self._retry_wait = 0
- if "warning" in tweet:
- if self._logger: self._logger.warning("Tweet warning received : %s" % repr(tweet))
- continue
if not tweet.strip():
if self._logger: self._logger.debug("Empty Tweet received : PING")
continue
yield tweet
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
- raise AuthenticationError("Error connecting to %s : %s" % (e.response.url,e.message))
+ if self._logger: self._logger.debug("SafeStreamWrapper Connection Error http error on %s : %s" % (e.response.url,e.message))
+ raise AuthenticationError("Error connecting to %s : %s : %s - %s" % (e.response.url,e.message, repr(e.response.headers),repr(e.response.text)))
if e.response.status_code > 200:
self.__process_http_error(e)
else:
--- a/script/lib/iri_tweet/iri_tweet/tests.py Sun Apr 21 21:55:06 2013 +0200
+++ b/script/lib/iri_tweet/iri_tweet/tests.py Tue May 07 18:57:54 2013 +0200
@@ -3,10 +3,14 @@
from sqlalchemy.orm import relationship, backref
import unittest #@UnresolvedImport
from sqlalchemy.orm import sessionmaker
-from iri_tweet.utils import ObjectsBuffer, TwitterProcessor
+from iri_tweet.utils import ObjectsBuffer
+from iri_tweet.processor import TwitterProcessorStatus
from iri_tweet import models
import tempfile #@UnresolvedImport
import os
+import logging
+
+logger = logging.getLogger(__name__);
Base = declarative_base()
@@ -129,12 +133,12 @@
def setUp(self):
self.engine, self.metadata, sessionMaker = models.setup_database('sqlite:///:memory:', echo=True)
self.session = sessionMaker()
- file, self.tmpfilepath = tempfile.mkstemp()
- os.close(file)
+ tmpfile, self.tmpfilepath = tempfile.mkstemp()
+ os.close(tmpfile)
def testTwitterProcessor(self):
- tp = TwitterProcessor(None, original_json, None, self.session, self.tmpfilepath)
+ tp = TwitterProcessorStatus(None, original_json, None, self.session, self.tmpfilepath, logger)
tp.process()
self.session.commit()
@@ -154,7 +158,7 @@
def testTwitterProcessorMedia(self):
- tp = TwitterProcessor(None, original_json_media, None, self.session, self.tmpfilepath)
+ tp = TwitterProcessorStatus(None, original_json_media, None, self.session, self.tmpfilepath, logger)
tp.process()
self.session.commit()
@@ -174,7 +178,7 @@
def testTwitterProcessorMediaOthers(self):
- tp = TwitterProcessor(None, original_json_media_others, None, self.session, self.tmpfilepath)
+ tp = TwitterProcessorStatus(None, original_json_media_others, None, self.session, self.tmpfilepath, logger)
tp.process()
self.session.commit()
--- a/script/lib/iri_tweet/iri_tweet/utils.py Sun Apr 21 21:55:06 2013 +0200
+++ b/script/lib/iri_tweet/iri_tweet/utils.py Tue May 07 18:57:54 2013 +0200
@@ -1,25 +1,22 @@
-from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, #@UnresolvedImport
- EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY,#@UnresolvedImport
- ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType, #@UnresolvedImport
- Media, EntityMedia, Entity, EntityType) #@UnresolvedImport
-from sqlalchemy.sql import select, or_ #@UnresolvedImport
-import Queue #@UnresolvedImport
-import anyjson #@UnresolvedImport
+from models import (Tweet, User, Hashtag, EntityHashtag, APPLICATION_NAME, ACCESS_TOKEN_SECRET, adapt_date, adapt_json,
+ ACCESS_TOKEN_KEY)
+from sqlalchemy.sql import select, or_
+import Queue
import codecs
import datetime
import email.utils
import logging
import math
import os.path
+import socket
import sys
-import twitter.oauth #@UnresolvedImport
-import twitter.oauth_dance #@UnresolvedImport
-import twitter_text #@UnresolvedImport
+import twitter.oauth
+import twitter.oauth_dance
CACHE_ACCESS_TOKEN = {}
-def get_oauth_token(token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET):
+def get_oauth_token(consumer_key, consumer_secret, token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME):
global CACHE_ACCESS_TOKEN
@@ -34,25 +31,27 @@
if res is not None and check_access_token:
get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable
- t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], CONSUMER_KEY, CONSUMER_SECRET))
+ t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], consumer_key, consumer_secret))
status = None
try:
- status = t.account.rate_limit_status()
+ status = t.application.rate_limit_status(resources="account")
except Exception as e:
get_logger().debug("get_oauth_token : error getting rate limit status %s " % repr(e))
get_logger().debug("get_oauth_token : error getting rate limit status %s " % str(e))
status = None
get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable
- if status is None or status['remaining_hits'] == 0:
+ if status is None or status.get("resources",{}).get("account",{}).get('/account/verify_credentials',{}).get('remaining',0) == 0:
get_logger().debug("get_oauth_token : Problem with status %s" % repr(status))
res = None
if res is None:
get_logger().debug("get_oauth_token : doing the oauth dance")
res = twitter.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
+
CACHE_ACCESS_TOKEN[application_name] = res
+ get_logger().debug("get_oauth_token : done got %s" % repr(res))
return res
def parse_date(date_str):
@@ -169,301 +168,6 @@
return proxy
return None
-class TwitterProcessorException(Exception):
- pass
-
-class TwitterProcessor(object):
-
- def __init__(self, json_dict, json_txt, source_id, session, access_token=None, token_filename=None, user_query_twitter=False):
-
- if json_dict is None and json_txt is None:
- raise TwitterProcessorException("No json")
-
- if json_dict is None:
- self.json_dict = anyjson.deserialize(json_txt)
- else:
- self.json_dict = json_dict
-
- if not json_txt:
- self.json_txt = anyjson.serialize(json_dict)
- else:
- self.json_txt = json_txt
-
- if "id" not in self.json_dict:
- raise TwitterProcessorException("No id in json")
-
- self.source_id = source_id
- self.session = session
- self.token_filename = token_filename
- self.access_token = access_token
- self.obj_buffer = ObjectsBuffer()
- self.user_query_twitter = user_query_twitter
-
-
-
- def __get_user(self, user_dict, do_merge):
- get_logger().debug("Get user : " + repr(user_dict)) #@UndefinedVariable
-
- user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
-
- user_id = user_dict.get("id",None)
- user_name = user_dict.get("screen_name", user_dict.get("name", None))
-
- if user_id is None and user_name is None:
- return None
-
- user = None
- if user_id:
- user = self.obj_buffer.get(User, id=user_id)
- else:
- user = self.obj_buffer.get(User, screen_name=user_name)
-
- #to do update user id needed
- if user is not None:
- user_created_at = None
- if user.args is not None:
- user_created_at = user.args.get('created_at', None)
- if user_created_at is None and user_dict.get('created_at', None) is not None and do_merge:
- if user.args is None:
- user.args = user_dict
- else:
- user.args.update(user_dict)
- return user
-
- #todo : add methpds to objectbuffer to get buffer user
- user_obj = None
- if user_id:
- user_obj = self.session.query(User).filter(User.id == user_id).first()
- else:
- user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first()
-
- #todo update user if needed
- if user_obj is not None:
- if user_obj.created_at is not None or user_dict.get('created_at', None) is None or not do_merge :
- user = ObjectBufferProxy(User, None, None, False, user_obj)
- else:
- user = self.obj_buffer.add_object(User, None, user_dict, True, user_obj)
- return user
-
- user_created_at = user_dict.get("created_at", None)
-
- if user_created_at is None and self.user_query_twitter:
-
- if self.access_token is not None:
- acess_token_key, access_token_secret = self.access_token
- else:
- acess_token_key, access_token_secret = get_oauth_token(self.token_filename)
- t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET))
- try:
- if user_id:
- user_dict = t.users.show(user_id=user_id)
- else:
- user_dict = t.users.show(screen_name=user_name)
- except Exception as e:
- get_logger().info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
- get_logger().info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
- return None
-
- if "id" not in user_dict:
- return None
-
- #TODO filter get, wrap in proxy
- user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first()
-
- if user_obj is not None and not do_merge:
- return ObjectBufferProxy(User, None, None, False, user_obj)
- else:
- return self.obj_buffer.add_object(User, None, user_dict, True)
-
- def __get_or_create_object(self, klass, filter_by_kwargs, filter_arg, creation_kwargs, must_flush, do_merge):
-
- obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs)
- if obj_proxy is None:
- query = self.session.query(klass)
- if filter_arg is not None:
- query = query.filter(filter_arg)
- else:
- query = query.filter_by(**filter_by_kwargs)
- obj_instance = query.first()
- if obj_instance is not None:
- if not do_merge:
- obj_proxy = ObjectBufferProxy(klass, None, None, False, obj_instance)
- else:
- obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush, obj_instance)
- if obj_proxy is None:
- obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush)
- return obj_proxy
-
-
- def __process_entity(self, ind, ind_type):
- get_logger().debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
-
- ind = clean_keys(ind)
-
- entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False)
-
- entity_dict = {
- "indice_start" : ind["indices"][0],
- "indice_end" : ind["indices"][1],
- "tweet_id" : self.tweet.id,
- "entity_type_id" : entity_type.id,
- "source" : adapt_json(ind)
- }
-
- def process_medias():
-
- media_id = ind.get('id', None)
- if media_id is None:
- return None, None
-
- type_str = ind.get("type", "photo")
- media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False)
- media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"])
- if "type" in media_ind:
- del(media_ind["type"])
- media_ind['type_id'] = media_type.id
- media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False)
-
- entity_dict['media_id'] = media.id
- return EntityMedia, entity_dict
-
- def process_hashtags():
- text = ind.get("text", ind.get("hashtag", None))
- if text is None:
- return None, None
- ind['text'] = text
- hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False)
- entity_dict['hashtag_id'] = hashtag.id
- return EntityHashtag, entity_dict
-
- def process_user_mentions():
- user_mention = self.__get_user(ind, False)
- if user_mention is None:
- entity_dict['user_id'] = None
- else:
- entity_dict['user_id'] = user_mention.id
- return EntityUser, entity_dict
-
- def process_urls():
- url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False)
- entity_dict['url_id'] = url.id
- return EntityUrl, entity_dict
-
- #{'': lambda }
- entity_klass, entity_dict = {
- 'hashtags': process_hashtags,
- 'user_mentions' : process_user_mentions,
- 'urls' : process_urls,
- 'media': process_medias,
- }.get(ind_type, lambda: (Entity, entity_dict))()
-
- get_logger().debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
- if entity_klass:
- self.obj_buffer.add_object(entity_klass, None, entity_dict, False)
-
-
- def __process_twitter_stream(self):
-
- tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
- if tweet_nb > 0:
- return
-
- ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
-
- # get or create user
- user = self.__get_user(self.json_dict["user"], True)
- if user is None:
- get_logger().warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
- ts_copy["user_id"] = None
- else:
- ts_copy["user_id"] = user.id
-
- del(ts_copy['user'])
- ts_copy["tweet_source_id"] = self.source_id
-
- self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True)
-
- self.__process_entities()
-
-
- def __process_entities(self):
- if "entities" in self.json_dict:
- for ind_type, entity_list in self.json_dict["entities"].items():
- for ind in entity_list:
- self.__process_entity(ind, ind_type)
- else:
-
- text = self.tweet.text
- extractor = twitter_text.Extractor(text)
- for ind in extractor.extract_hashtags_with_indices():
- self.__process_entity(ind, "hashtags")
-
- for ind in extractor.extract_urls_with_indices():
- self.__process_entity(ind, "urls")
-
- for ind in extractor.extract_mentioned_screen_names_with_indices():
- self.__process_entity(ind, "user_mentions")
-
- def __process_twitter_rest(self):
- tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
- if tweet_nb > 0:
- return
-
-
- tweet_fields = {
- 'created_at': self.json_dict["created_at"],
- 'favorited': False,
- 'id': self.json_dict["id"],
- 'id_str': self.json_dict["id_str"],
- #'in_reply_to_screen_name': ts["to_user"],
- 'in_reply_to_user_id': self.json_dict.get("in_reply_to_user_id",None),
- 'in_reply_to_user_id_str': self.json_dict.get("in_reply_to_user_id_str", None),
- #'place': ts["place"],
- 'source': self.json_dict["source"],
- 'text': self.json_dict["text"],
- 'truncated': False,
- 'tweet_source_id' : self.source_id,
- }
-
- #user
-
- user_fields = {
- 'lang' : self.json_dict.get('iso_language_code',None),
- 'profile_image_url' : self.json_dict["profile_image_url"],
- 'screen_name' : self.json_dict["from_user"],
- 'id' : self.json_dict["from_user_id"],
- 'id_str' : self.json_dict["from_user_id_str"],
- 'name' : self.json_dict['from_user_name'],
- }
-
- user = self.__get_user(user_fields, do_merge=False)
- if user is None:
- get_logger().warning("USER not found " + repr(user_fields)) #@UndefinedVariable
- tweet_fields["user_id"] = None
- else:
- tweet_fields["user_id"] = user.id
-
- tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
- self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True)
-
- self.__process_entities()
-
-
-
- def process(self):
-
- if self.source_id is None:
- tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True)
- self.source_id = tweet_source.id
-
- if "metadata" in self.json_dict:
- self.__process_twitter_rest()
- else:
- self.__process_twitter_stream()
-
- self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, True)
-
- self.obj_buffer.persists(self.session)
def set_logging(options, plogger=None, queue=None):
@@ -508,12 +212,12 @@
return logger
def set_logging_options(parser):
- parser.add_option("-l", "--log", dest="logfile",
+ parser.add_argument("-l", "--log", dest="logfile",
help="log to file", metavar="LOG", default="stderr")
- parser.add_option("-v", dest="verbose", action="count",
- help="verbose", metavar="VERBOSE", default=0)
- parser.add_option("-q", dest="quiet", action="count",
- help="quiet", metavar="QUIET", default=0)
+ parser.add_argument("-v", dest="verbose", action="count",
+ help="verbose", default=0)
+ parser.add_argument("-q", dest="quiet", action="count",
+ help="quiet", default=0)
def get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
@@ -591,8 +295,10 @@
if ei:
dummy = self.format(record) # just to get traceback text into record.exc_text
record.exc_info = None # not needed any more
- if not self.ignore_full or not self.queue.full():
+ if not self.ignore_full or (not self.queue.full()):
self.queue.put_nowait(record)
+ except AssertionError:
+ pass
except Queue.Full:
if self.ignore_full:
pass
@@ -626,3 +332,10 @@
return writer
+def get_unused_port():
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.bind(('localhost', 0))
+ _, port = s.getsockname()
+ s.close()
+ return port
+
--- a/script/rest/export_twitter.py Sun Apr 21 21:55:06 2013 +0200
+++ b/script/rest/export_twitter.py Tue May 07 18:57:54 2013 +0200
@@ -1,16 +1,15 @@
#!/usr/bin/env python
# coding=utf-8
-from sqlite3 import *
+from sqlite3 import register_adapter, register_converter, connect, Row
import datetime, time
import email.utils
from optparse import OptionParser
import os.path
-import os
-import sys
from lxml import etree
import uuid
import re
+import simplejson
def parse_date(date_str):
ts = email.utils.parsedate_tz(date_str)
@@ -20,10 +19,10 @@
return time.mktime(ts.timetuple())
def adapt_geo(geo):
- return simplejson.dumps(geo)
-
+ return simplejson.dumps(geo)
+
def convert_geo(s):
- return simplejson.loads(s)
+ return simplejson.loads(s)
register_adapter(datetime.datetime, adapt_datetime)
@@ -73,7 +72,7 @@
ts = int(parse_date(options.start_date))
if options.end_date:
- te = int(parse_date(options.end_date))
+ te = int(parse_date(options.end_date))
else:
te = ts + options.duration
--- a/script/stream/recorder_stream.py Sun Apr 21 21:55:06 2013 +0200
+++ b/script/stream/recorder_stream.py Tue May 07 18:57:54 2013 +0200
@@ -1,14 +1,14 @@
-from getpass import getpass
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from iri_tweet import models, utils
from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
-from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event,
- get_logger)
-from optparse import OptionParser
+from iri_tweet.processor import get_processor
+from multiprocessing import Queue as mQueue, Process, Event
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import scoped_session
import Queue
import StringIO
import anyjson
+import argparse
import datetime
import inspect
import iri_tweet.stream
@@ -21,6 +21,7 @@
import socket
import sqlalchemy.schema
import sys
+import thread
import threading
import time
import traceback
@@ -35,7 +36,20 @@
columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
# just put it in a sqlite3 tqble
-DEFAULT_TIMEOUT = 5
+DEFAULT_TIMEOUT = 3
+
+class Requesthandler(BaseHTTPRequestHandler):
+
+ def __init__(self, request, client_address, server):
+ BaseHTTPRequestHandler.__init__(self, request, client_address, server)
+
+ def do_GET(self):
+ self.send_response(200)
+ self.end_headers()
+
+ def log_message(self, format, *args): # @ReservedAssignment
+ pass
+
def set_logging(options):
loggers = []
@@ -55,19 +69,16 @@
return qlogger
def get_auth(options, access_token):
- if options.username and options.password:
- auth = requests.auth.BasicAuthHandler(options.username, options.password)
- else:
- consumer_key = models.CONSUMER_KEY
- consumer_secret = models.CONSUMER_SECRET
- auth = requests_oauthlib.OAuth1(access_token[0], access_token[1], consumer_key, consumer_secret, signature_type='auth_header')
+ consumer_key = options.consumer_key
+ consumer_secret = options.consumer_secret
+ auth = requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=access_token[0], resource_owner_secret=access_token[1], signature_type='auth_header')
return auth
-def add_process_event(type, args, session_maker):
+def add_process_event(event_type, args, session_maker):
session = session_maker()
try:
- evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
+ evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=event_type)
session.add(evt)
session.commit()
finally:
@@ -83,6 +94,7 @@
self.options = options
self.logger_queue = logger_queue
self.stop_event = stop_event
+ self.consumer_token = (options.consumer_key, options.consumer_secret)
self.access_token = access_token
super(BaseProcess, self).__init__()
@@ -122,16 +134,15 @@
def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
self.track = options.track
self.token_filename = options.token_filename
- self.catchup = options.catchup
self.timeout = options.timeout
self.stream = None
super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
def __source_stream_iter(self):
-
- self.logger = set_logging_process(self.options, self.logger_queue)
+
self.logger.debug("SourceProcess : run ")
+ self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.access_token))
self.auth = get_auth(self.options, self.access_token)
self.logger.debug("SourceProcess : auth set ")
@@ -140,8 +151,8 @@
track_list = [k.strip() for k in track_list.split(',')]
- self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))
- self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger)
+ self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth)))
+ self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, chunk_size=512, logger=self.logger)
self.logger.debug("SourceProcess : after connecting to stream")
self.stream.muststop = lambda: self.stop_event.is_set()
@@ -149,11 +160,14 @@
session = self.session_maker()
+ #import pydevd
+ #pydevd.settrace(suspend=False)
+
+
try:
for tweet in stream_wrapper:
if not self.parent_is_alive():
self.stop_event.set()
- stop_thread.join(5)
sys.exit()
self.logger.debug("SourceProcess : tweet " + repr(tweet))
source = TweetSource(original_json=tweet)
@@ -193,41 +207,52 @@
def do_run(self):
- # import pydevd
- # pydevd.settrace(suspend=False)
+ self.logger = set_logging_process(self.options, self.logger_queue)
source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
source_stream_iter_thread.start()
- while not self.stop_event.is_set():
- self.logger.debug("SourceProcess : In while after start")
- self.stop_event.wait(DEFAULT_TIMEOUT)
- if self.stop_event.is_set() and self.stream:
- self.stream.close()
- elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
- self.stop_event.set()
+ try:
+ while not self.stop_event.is_set():
+ self.logger.debug("SourceProcess : In while after start")
+ self.stop_event.wait(DEFAULT_TIMEOUT)
+ except KeyboardInterrupt:
+ self.stop_event.set()
+ pass
+ if self.stop_event.is_set() and self.stream:
+ self.stream.close()
+ elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
+ self.stop_event.set()
+
self.logger.info("SourceProcess : join")
source_stream_iter_thread.join(30)
-def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger):
+def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger):
try:
if not tweet.strip():
return
tweet_obj = anyjson.deserialize(tweet)
- if 'text' not in tweet_obj:
+ processor_klass = get_processor(tweet_obj)
+ if not processor_klass:
tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
session.add(tweet_log)
return
- screen_name = ""
- if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
- screen_name = tweet_obj['user']['screen_name']
- logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
- logger.debug(u"Process_tweet :" + repr(tweet))
- processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user)
+ processor = processor_klass(json_dict=tweet_obj,
+ json_txt=tweet,
+ source_id=source_id,
+ session=session,
+ consumer_token=consumer_token,
+ access_token=access_token,
+ token_filename=token_filename,
+ user_query_twitter=twitter_query_user,
+ logger=logger)
+ logger.info(processor.log_info())
+ logger.debug(u"Process_tweet :" + repr(tweet))
processor.process()
+
except ValueError as e:
message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
output = StringIO.StringIO()
@@ -274,8 +299,10 @@
except Exception as e:
self.logger.debug('Process tweet exception in loop : ' + repr(e))
continue
- process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger)
+ process_tweet(tweet_txt, source_id, session, self.consumer_token, self.access_token, self.twitter_query_user, self.options.token_filename, self.logger)
session.commit()
+ except KeyboardInterrupt:
+ self.stop_event.set()
finally:
session.rollback()
session.close()
@@ -287,15 +314,20 @@
return Session, engine, metadata
-def process_leftovers(session, access_token, twitter_query_user, logger):
+def process_leftovers(session, consumer_token, access_token, twitter_query_user, token_filename, ask_process_leftovers, logger):
sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
+ sources_count = sources.count()
+ if sources_count > 10 and ask_process_leftovers:
+ resp = raw_input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count)
+ if resp and resp.strip().lower() == "n":
+ return
+ logger.info("Process leftovers, %d tweets to process" % (sources_count))
for src in sources:
tweet_txt = src.original_json
- process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger)
+ process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger)
session.commit()
-
# get tweet source that do not match any message
@@ -315,38 +347,36 @@
def get_options():
- usage = "usage: %prog [options]"
+ usage = "usage: %(prog)s [options]"
- parser = OptionParser(usage=usage)
+ parser = argparse.ArgumentParser(usage=usage)
- parser.add_option("-f", "--file", dest="conn_str",
- help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
- parser.add_option("-u", "--user", dest="username",
- help="Twitter user", metavar="USER", default=None)
- parser.add_option("-w", "--password", dest="password",
- help="Twitter password", metavar="PASSWORD", default=None)
- parser.add_option("-T", "--track", dest="track",
- help="Twitter track", metavar="TRACK")
- parser.add_option("-n", "--new", dest="new", action="store_true",
- help="new database", default=False)
- parser.add_option("-D", "--daemon", dest="daemon", action="store_true",
- help="launch daemon", default=False)
- parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
- help="Token file name")
- parser.add_option("-d", "--duration", dest="duration",
- help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
- parser.add_option("-N", "--nb-process", dest="process_nb",
- help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
- parser.add_option("--url", dest="url",
- help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url)
- parser.add_option("--query-user", dest="twitter_query_user", action="store_true",
- help="Query twitter for users", default=False, metavar="QUERY_USER")
- parser.add_option("--catchup", dest="catchup",
- help="catchup count for tweets", default=None, metavar="CATCHUP", type='int')
- parser.add_option("--timeout", dest="timeout",
- help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int')
-
-
+ parser.add_argument("-f", "--file", dest="conn_str",
+ help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
+ parser.add_argument("-T", "--track", dest="track",
+ help="Twitter track", metavar="TRACK")
+ parser.add_argument("-k", "--key", dest="consumer_key",
+ help="Twitter consumer key", metavar="CONSUMER_KEY", required=True)
+ parser.add_argument("-s", "--secret", dest="consumer_secret",
+ help="Twitter consumer secret", metavar="CONSUMER_SECRET", required=True)
+ parser.add_argument("-n", "--new", dest="new", action="store_true",
+ help="new database", default=False)
+ parser.add_argument("-D", "--daemon", dest="daemon", action="store_true",
+ help="launch daemon", default=False)
+ parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
+ help="Token file name")
+ parser.add_argument("-d", "--duration", dest="duration",
+ help="Duration of recording in seconds", metavar="DURATION", default= -1, type=int)
+ parser.add_argument("-N", "--nb-process", dest="process_nb",
+ help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type=int)
+ parser.add_argument("--url", dest="url",
+ help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url)
+ parser.add_argument("--query-user", dest="twitter_query_user", action="store_true",
+ help="Query twitter for users", default=False)
+ parser.add_argument("--timeout", dest="timeout",
+ help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type=int)
+ parser.add_argument("--ask-process-leftovers", dest="ask_process_leftovers", action="store_false",
+ help="ask process leftover", default=True)
utils.set_logging_options(parser)
@@ -357,14 +387,14 @@
def do_run(options, session_maker):
stop_args = {}
-
- access_token = None
- if not options.username or not options.password:
- access_token = utils.get_oauth_token(options.token_filename)
+
+ consumer_token = (options.consumer_key, options.consumer_secret)
+ access_token = utils.get_oauth_token(consumer_key=consumer_token[0], consumer_secret=consumer_token[1], token_file_path=options.token_filename)
+
session = session_maker()
try:
- process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
+ process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
session.commit()
finally:
session.rollback()
@@ -378,7 +408,10 @@
stop_event = Event()
# workaround for bug on using urllib2 and multiprocessing
- req = urllib2.Request('http://localhost')
+ httpd = HTTPServer(('127.0.0.1',0), Requesthandler)
+ thread.start_new_thread(httpd.handle_request, ())
+
+ req = urllib2.Request('http://localhost:%d' % httpd.server_port)
conn = None
try:
conn = urllib2.urlopen(req)
@@ -392,7 +425,7 @@
process_engines = []
logger_queues = []
- SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+ SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
process_engines.append(engine_process)
lqueue = mQueue(50)
logger_queues.append(lqueue)
@@ -402,7 +435,7 @@
tweet_processes = []
for i in range(options.process_nb - 1):
- SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+ SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
process_engines.append(engine_process)
lqueue = mQueue(50)
logger_queues.append(lqueue)
@@ -462,21 +495,13 @@
cprocess.terminate()
- utils.get_logger().debug("Close queues")
- try:
- queue.close()
- for lqueue in logger_queues:
- lqueue.close()
- except exception as e:
- utils.get_logger().error("error when closing queues %s", repr(e))
- # do nothing
-
+ utils.get_logger().debug("Close queues")
if options.process_nb > 1:
utils.get_logger().debug("Processing leftovers")
session = session_maker()
try:
- process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
+ process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
session.commit()
finally:
session.rollback()
@@ -484,11 +509,19 @@
for pengine in process_engines:
pengine.dispose()
+
+ try:
+ queue.close()
+ for lqueue in logger_queues:
+ lqueue.close()
+ except Exception as e:
+ utils.get_logger().error("error when closing queues %s", repr(e))
+ # do nothing
return stop_args
-def main(options, args):
+def main(options):
global conn_str
@@ -513,7 +546,8 @@
Session, engine, metadata = get_sessionmaker(conn_str)
if options.new:
- check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
+ check_metadata = sqlalchemy.schema.MetaData(bind=engine)
+ check_metadata.reflect()
if len(check_metadata.sorted_tables) > 0:
message = "Database %s not empty exiting" % conn_str
utils.get_logger().error(message)
@@ -528,7 +562,7 @@
stop_args = {}
try:
- add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
+ add_process_event(event_type="start", args={'options':options.__dict__, 'args': [], 'command_line': sys.argv}, session_maker=Session)
stop_args = do_run(options, Session)
except Exception as e:
utils.get_logger().exception("Error in main thread")
@@ -540,7 +574,7 @@
outfile.close()
raise
finally:
- add_process_event(type="shutdown", args=stop_args, session_maker=Session)
+ add_process_event(event_type="shutdown", args=stop_args, session_maker=Session)
utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
@@ -548,15 +582,15 @@
if __name__ == '__main__':
- (options, args) = get_options()
+ options = get_options()
loggers = set_logging(options)
utils.get_logger().debug("OPTIONS : " + repr(options))
if options.daemon:
+ options.ask_process_leftovers = False
import daemon
- import lockfile
hdlr_preserve = []
for logger in loggers:
@@ -564,7 +598,7 @@
context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve)
with context:
- main(options, args)
+ main(options)
else:
- main(options, args)
-
+ main(options)
+
--- a/script/utils/export_twitter_alchemy.py Sun Apr 21 21:55:06 2013 +0200
+++ b/script/utils/export_twitter_alchemy.py Tue May 07 18:57:54 2013 +0200
@@ -416,7 +416,7 @@
get_logger().debug("write http " + repr(project)) #@UndefinedVariable
r = requests.put(content_file_write, data=anyjson.dumps(project), headers={'content-type':'application/json'}, params=post_param);
get_logger().debug("write http " + repr(r) + " content " + r.text) #@UndefinedVariable
- if r.status_code != requests.codes.ok:
+ if r.status_code != requests.codes.ok: # @UndefinedVariable
r.raise_for_status()
else:
if content_file_write and os.path.exists(content_file_write):
--- a/script/utils/merge_tweets.py Sun Apr 21 21:55:06 2013 +0200
+++ b/script/utils/merge_tweets.py Tue May 07 18:57:54 2013 +0200
@@ -1,12 +1,15 @@
#from models import setup_database
from iri_tweet.models import setup_database, TweetSource, Tweet, TweetLog
-from iri_tweet.utils import TwitterProcessor, get_oauth_token, show_progress
+from iri_tweet.processor import TwitterProcessorStatus
+from iri_tweet.utils import get_oauth_token, show_progress
+import anyjson
import argparse
-import sys
+import codecs
+import logging
import re
-import anyjson
-import math
-import codecs
+import sys
+
+logger = logging.getLogger(__name__)
def get_option():
@@ -16,6 +19,10 @@
help="log to file", metavar="LOG", default="stderr")
parser.add_argument("-v", dest="verbose", action="count",
help="verbose", default=0)
+ parser.add_option("-k", "--key", dest="consumer_key",
+ help="Twitter consumer key", metavar="CONSUMER_KEY")
+ parser.add_option("-s", "--secret", dest="consumer_secret",
+ help="Twitter consumer secret", metavar="CONSUMER_SECRET")
parser.add_argument("-q", dest="quiet", action="count",
help="quiet", default=0)
parser.add_argument("--query-user", dest="query_user", action="store_true",
@@ -38,7 +45,7 @@
access_token = None
if options.query_user:
- access_token = get_oauth_token(options.token_filename)
+ access_token = get_oauth_token(options.consumer_key, options.consumer_secret, options.token_filename)
#open source
src_conn_str = options.source[0].strip()
@@ -60,7 +67,7 @@
session_src = Session_src()
session_tgt = Session_tgt()
- count_tw_query = Tweet.__table__.count()
+ count_tw_query = Tweet.__table__.count() # @UndefinedVariable
count_tw = engine_src.scalar(count_tw_query)
@@ -83,10 +90,10 @@
tweet_obj = anyjson.deserialize(tweet_source)
if 'text' not in tweet_obj:
- tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
+ tweet_log = TweetLog(tweet_source_id=tweet.tweet_source.id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
session_tgt.add(tweet_log)
else:
- tp = TwitterProcessor(None, tweet_source, None, session_tgt, access_token, options.token_filename, user_query_twitter=options.query_user)
+ tp = TwitterProcessorStatus(None, tweet_source, None, session_tgt, access_token, options.token_filename, user_query_twitter=options.query_user, logger=logger)
tp.process()
session_tgt.flush()
@@ -98,9 +105,13 @@
print u"%d new tweet added" % (added)
finally:
- session_tgt.close() if session_tgt is not None else None
- session_src.close() if session_src is not None else None
- conn_tgt.close() if conn_tgt is not None else None
- conn_src.close() if conn_src is not None else None
+ if session_tgt is not None:
+ session_tgt.close()
+ if session_src is not None:
+ session_src.close()
+ if conn_tgt is not None:
+ conn_tgt.close()
+ if conn_src is not None:
+ conn_src.close()
\ No newline at end of file
--- a/script/utils/search_topsy.py Sun Apr 21 21:55:06 2013 +0200
+++ b/script/utils/search_topsy.py Tue May 07 18:57:54 2013 +0200
@@ -1,17 +1,15 @@
+from blessings import Terminal
from iri_tweet import models, utils
-from sqlalchemy.orm import sessionmaker
-import anyjson
-import sqlite3
-import twitter
+from iri_tweet.processor import TwitterProcessorStatus
+from optparse import OptionParser
+import logging
+import math
import re
import requests
-from optparse import OptionParser
-import simplejson
import time
-from blessings import Terminal
-import sys
-import math
-from symbol import except_clause
+import twitter
+
+logger = logging.getLogger(__name__)
APPLICATION_NAME = "Tweet recorder user"
CONSUMER_KEY = "Vdr5ZcsjI1G3esTPI8yDg"
@@ -86,7 +84,7 @@
utils.set_logging(options);
- acess_token_key, access_token_secret = utils.get_oauth_token(options.token_filename, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET)
+ acess_token_key, access_token_secret = utils.get_oauth_token(consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET, options.token_filename, application_name=APPLICATION_NAME)
t = twitter.Twitter(domain="api.twitter.com", auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET), secure=True)
t.secure = True
@@ -144,7 +142,7 @@
else:
raise
- processor = utils.TwitterProcessor(tweet, None, None, session, None, options.token_filename)
+ processor = TwitterProcessorStatus(tweet, None, None, session, None, options.token_filename, logger)
processor.process()
session.flush()
session.commit()
--- a/script/utils/tweet_twitter_user.py Sun Apr 21 21:55:06 2013 +0200
+++ b/script/utils/tweet_twitter_user.py Tue May 07 18:57:54 2013 +0200
@@ -104,7 +104,7 @@
query_res = query.all()
- acess_token_key, access_token_secret = get_oauth_token(options.token_filename, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET)
+ acess_token_key, access_token_secret = get_oauth_token(consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET, token_file_path=options.token_filename, application_name=APPLICATION_NAME)
t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET))
for user in query_res:
--- a/script/virtualenv/res/lib/lib_create_env.py Sun Apr 21 21:55:06 2013 +0200
+++ b/script/virtualenv/res/lib/lib_create_env.py Tue May 07 18:57:54 2013 +0200
@@ -25,12 +25,13 @@
'DATEUTIL': {'setup': 'python-dateutil', 'url':'http://pypi.python.org/packages/source/p/python-dateutil/python-dateutil-2.1.tar.gz', 'local':"python-dateutil-2.1.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
'PYTZ': {'setup': 'pytz', 'url':'http://pypi.python.org/packages/source/p/pytz/pytz-2013b.tar.bz2', 'local':"pytz-2013b.tar.bz2", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
'SIMPLEJSON': {'setup': 'simplejson', 'url':'http://pypi.python.org/packages/source/s/simplejson/simplejson-3.1.3.tar.gz', 'local':"simplejson-3.1.3.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
- 'SQLALCHEMY': {'setup': 'sqlalchemy', 'url':'http://www.python.org/pypi/SQLAlchemy/0.8.0', 'local':"SQLAlchemy-0.8.0.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
+ 'SQLALCHEMY': {'setup': 'sqlalchemy', 'url':'http://www.python.org/pypi/SQLAlchemy/0.8.1', 'local':"SQLAlchemy-0.8.1.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
'TWITTER': {'setup': 'twitter', 'url':'http://pypi.python.org/packages/source/t/twitter/twitter-1.9.2.tar.gz', 'local':"twitter-1.9.2.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
'TWITTER-TEXT': {'setup': 'twitter-text', 'url':'https://github.com/dryan/twitter-text-py/archive/master.tar.gz', 'local':"twitter-text-1.0.4.tar.gz", 'install': {'method': 'pip', 'option_str': None, 'dict_extra_env': None}},
'REQUESTS': {'setup': 'requests', 'url':'https://github.com/kennethreitz/requests/archive/v1.2.0.tar.gz', 'local':'requests-v1.2.0.tar.gz', 'install' : {'method':'pip', 'option_str': None, 'dict_extra_env': None}},
'OAUTHLIB': {'setup': 'oauthlib', 'url':'https://github.com/idan/oauthlib/archive/0.4.0.tar.gz', 'local':'oauthlib-0.4.0.tar.gz', 'install' : {'method':'pip', 'option_str': None, 'dict_extra_env': None}},
'REQUESTS-OAUTHLIB': {'setup': 'requests-oauthlib', 'url':'https://github.com/requests/requests-oauthlib/archive/master.tar.gz', 'local':'requests-oauthlib-0.3.0.tar.gz', 'install' : {'method':'pip', 'option_str': None, 'dict_extra_env': None}},
+ 'BLESSINGS': {'setup': 'blessings', 'url':'https://github.com/erikrose/blessings/archive/1.5.tar.gz', 'local':'blessings-1.5.tar.gz', 'install' : {'method':'pip', 'option_str': None, 'dict_extra_env': None}}
}
if system_str == 'Windows':
Binary file script/virtualenv/res/src/SQLAlchemy-0.8.0.tar.gz has changed
Binary file script/virtualenv/res/src/SQLAlchemy-0.8.1.tar.gz has changed
Binary file script/virtualenv/res/src/blessings-1.5.tar.gz has changed
--- a/script/virtualenv/script/res/res_create_env.py Sun Apr 21 21:55:06 2013 +0200
+++ b/script/virtualenv/script/res/res_create_env.py Tue May 07 18:57:54 2013 +0200
@@ -21,7 +21,8 @@
'TWITTER-TEXT',
'REQUESTS',
'OAUTHLIB',
- 'REQUESTS-OAUTHLIB'
+ 'REQUESTS-OAUTHLIB',
+ 'BLESSINGS'
]
if system_str == "Linux":