# HG changeset patch # User Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com> # Date 1295371518 -3600 # Node ID 4daf47fcf79207d9ce7bb6dde9c363ab1f7c88d6 # Parent 54d7f1486ac4e6481082e0607c8d17a58e7f205c move lib and prepare tweetstream for modification diff -r 54d7f1486ac4 -r 4daf47fcf792 .settings/org.eclipse.core.resources.prefs --- a/.settings/org.eclipse.core.resources.prefs Tue Jan 18 10:08:03 2011 +0100 +++ b/.settings/org.eclipse.core.resources.prefs Tue Jan 18 18:25:18 2011 +0100 @@ -1,4 +1,4 @@ -#Fri Jan 07 10:05:33 CET 2011 +#Tue Jan 18 10:08:53 CET 2011 eclipse.preferences.version=1 -encoding//script/iri_tweet/export_twitter_alchemy.py=utf-8 +encoding//script/lib/iri_tweet/export_twitter_alchemy.py=utf-8 encoding//script/rest/export_twitter.py=utf-8 diff -r 54d7f1486ac4 -r 4daf47fcf792 script/iri_tweet/__init__.py diff -r 54d7f1486ac4 -r 4daf47fcf792 script/iri_tweet/create_twitter_export_conf.py --- a/script/iri_tweet/create_twitter_export_conf.py Tue Jan 18 10:08:03 2011 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,43 +0,0 @@ -from lxml import etree -from optparse import OptionParser - -def get_options(): - - parser = OptionParser() - - parser.add_option("-f", "--file", dest="outputfile", - help="destination filename", metavar="FILE", default="twitter_export_conf.xml") - parser.add_option("-i", "--input", dest="inputfile", - help="inputfile", metavar="INPUT", default=None) - - return parser.parse_args() - -if __name__ == "__main__": - (options, args) = get_options() - - dest_filename = options.outputfile - - path_list = [] - if options.inputfile is None: - path_list = args - else: - with open(options.inputfile, 'r') as fi: - path_list = fi - - - root = etree.Element("twitter_export") - - - for path in path_list: - - iri_doc = etree.parse(path) - media_nodes = iri_doc.xpath("/iri/body/medias/media[@id='video']/video") - duration = int(media_nodes[0].get("dur"))/1000 - - file_elem = etree.SubElement(root, "file") - etree.SubElement(file_elem, "path").text = path - etree.SubElement(file_elem, "start_date") - etree.SubElement(file_elem, "duration").text = unicode(duration) - - tree = etree.ElementTree(root) - tree.write(dest_filename, encoding="utf-8", pretty_print=True, xml_declaration=True) \ No newline at end of file diff -r 54d7f1486ac4 -r 4daf47fcf792 script/iri_tweet/export_tweet_db.py --- a/script/iri_tweet/export_tweet_db.py Tue Jan 18 10:08:03 2011 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,47 +0,0 @@ -from models import * -from utils import * -from optparse import OptionParser -from sqlalchemy.orm import sessionmaker -import logging -import sqlite3 -import sys - - -# 'entities': "tweet_entity", -# 'user': "tweet_user" - -def get_option(): - - parser = OptionParser() - - set_logging_options(parser) - - return parser.parse_args() - -if __name__ == "__main__": - - (options, args) = get_option() - - set_logging(options) - - with sqlite3.connect(args[0]) as conn_in: - engine, metadata = setup_database('sqlite:///'+args[1], echo=((options.verbose-options.quiet)>0)) - Session = sessionmaker(bind=engine) - session = Session() - try: - curs_in = conn_in.cursor() - fields_mapping = {} - for i,res in enumerate(curs_in.execute("select json from tweet_tweet;")): - logging.debug("main loop %d : %s" % (i, res[0])) - processor = TwitterProcessor(eval(res[0]), res[0], session) - processor.process() - session.commit() - logging.debug("main : %d tweet processed" % (i+1)) - except Exception, e: - session.rollback() - raise e - finally: - session.close() - - - \ No newline at end of file diff -r 54d7f1486ac4 -r 4daf47fcf792 script/iri_tweet/export_twitter_alchemy.py --- a/script/iri_tweet/export_twitter_alchemy.py Tue Jan 18 10:08:03 2011 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,216 +0,0 @@ -#!/usr/bin/env python -# coding=utf-8 - -from lxml import etree -from models import * -from optparse import OptionParser -from sqlalchemy import Table, Column, Integer, BigInteger, String, MetaData, \ - ForeignKey -from sqlalchemy.orm import sessionmaker, mapper -from sqlalchemy.sql import select -import datetime -import email.utils -import logging -import os -import os.path -import re -import sys -import time -import uuid - -#class TweetExclude(object): -# def __init__(self, id): -# self.id = id -# -# def __repr__(self): -# return "" % (self.id) - -def parse_date(date_str): - ts = email.utils.parsedate_tz(date_str) - return datetime.datetime(*ts[0:7]) - -def get_options(): - parser = OptionParser() - parser.add_option("-f", "--file", dest="filename", - help="write export to file", metavar="FILE", default="project_enmi.ldt") - parser.add_option("-d", "--database", dest="database", - help="Input database", metavar="DATABASE") - parser.add_option("-s", "--start-date", dest="start_date", - help="start date", metavar="START_DATE") - parser.add_option("-e", "--end-date", dest="end_date", - help="end date", metavar="END_DATE") - parser.add_option("-I", "--content-file", dest="content_file", - help="Content file", metavar="CONTENT_FILE") - parser.add_option("-c", "--content", dest="content", - help="Content url", metavar="CONTENT") - parser.add_option("-V", "--video-url", dest="video", - help="video url", metavar="VIDEO") - parser.add_option("-i", "--content-id", dest="content_id", - help="Content id", metavar="CONTENT_ID") - parser.add_option("-x", "--exclude", dest="exclude", - help="file containing the id to exclude", metavar="EXCLUDE") - parser.add_option("-C", "--color", dest="color", - help="Color code", metavar="COLOR", default="16763904") - parser.add_option("-H", "--hashtag", dest="hashtag", - help="Hashtag", metavar="HASHTAG", default="enmi") - parser.add_option("-D", "--duration", dest="duration", type="int", - help="Duration", metavar="DURATION", default=None) - parser.add_option("-n", "--name", dest="name", - help="Cutting name", metavar="NAME", default=u"Tweets") - parser.add_option("-R", "--replace", dest="replace", action="store_true", - help="Replace tweet ensemble", metavar="REPLACE", default=False) - parser.add_option("-l", "--log", dest="logfile", - help="log to file", metavar="LOG", default="stderr") - - set_logging_options(parser) - - - return parser.parse_args() - - -if __name__ == "__main__" : - - (options, args) = get_options() - - set_logging(options) - - logging.debug("OPTIONS : " + repr(options)) - - engine, metadata = setup_database('sqlite:///'+options.database, echo=((options.verbose-options.quiet)>0), create_all = False) - - Session = sessionmaker() - conn = engine.connect() - try : - session = Session(bind=conn) - try : - - metadata = MetaData(bind=conn) - tweet_exclude_table = Table("tweet_exclude", metadata, Column('id', BigInteger, primary_key=True), prefixes=['TEMPORARY']) - #mapper(TweetExclude, tweet_exclude_table) - metadata.create_all() - - if options.exclude and os.path.exists(options.exclude): - with open(options.exclude, 'r+') as f: - tei = tweet_exclude_table.insert() - for line in f: - conn.execute(tei.values(id=long(line.strip()))) - - if options.listconf: - - parameters = [] - confdoc = etree.parse(options.listconf) - for node in confdoc.xpath("/twitter_export/file"): - params = {} - for snode in node: - if snode.tag == "path": - params['content_file'] = snode.text - elif snode.tag == "start_date": - params['start_date'] = snode.text - elif snode.tag == "end_date": - params['end_date'] = snode.text - elif snode.tag == "duration": - params['duration'] = int(snode.text) - parameters.append(params) - else: - parameters = [{ - 'start_date': options.start_date, - 'end_date' : options.end_date, - 'duration' : options.duration, - 'content_file' : otions.content_file - - }] - - for params in parameters: - - logging.debug("PARAMETERS " + repr(params)) - - start_date_str = params.get("start_date",None) - end_date_str = params.get("end_date", None) - duration = params.get("duration", None) - content_file = params.get("content_file", None) - - - start_date = parse_date(start_date_str) - ts = time.mktime(start_date.timetuple()) - - if end_date_str: - end_date = parse_date(end_date_str) - te = time.mktime(end_date.timetuple()) - else: - te = ts + duration - end_date = start_date + datetime.timedelta(seconds=duration) - - - query_res = session.query(Tweet).join(EntityHashtag).join(Hashtag).filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))).filter(Hashtag.text.contains(options.hashtag)).filter(Tweet.created_at >= start_date).filter(Tweet.created_at <= end_date).all() - - #hashtag = u"%#"+unicode(options.hashtag)+u"%" - - #cursor.execute("select tt.id, tt.text, tt.created_at_ts, tu.name, tu.screen_name from tweet_tweet as tt join tweet_user as tu on tt.user = tu.rowid where text like ? and tt.created_at_ts >= ? and tt.created_at_ts <= ? and tt.id not in (select id from tweet_exclude) order by tt.created_at_ts asc;", (hashtag,ts,te)); - - root = None - ensemble_parent = None - - if content_file and os.path.exists(content_file): - - doc = etree.parse(content_file) - root = doc.getroot() - - ensemble_parent = root.xpath("//ensembles")[0] - - else: - root = etree.Element(u"iri") - - project = etree.SubElement(root, u"project", {u"abstract":u"Twitter comments on ENMI",u"title":u"Twitter comments on ENMI 2009", u"user":u"IRI Web", u"id":unicode(uuid.uuid4())}) - - medias = etree.SubElement(root, u"medias") - media = etree.SubElement(medias, u"media", {u"pict":u"", u"src":unicode(options.content), u"video":unicode(options.video), u"id":unicode(options.content_id), u"extra":u""}) - - annotations = etree.SubElement(root, u"annotations") - content = etree.SubElement(annotations, u"content", {u"id":unicode(options.content_id)}) - ensemble_parent = content - - if options.replace: - for ens in ensemble_parent.iterchildren(tag=u"ensemble"): - if ens.get("id","").startswith("tweet_"): - ensemble_parent.remove(ens) - - ensemble = etree.SubElement(ensemble_parent, u"ensemble", {u"id":u"tweet_" + unicode(uuid.uuid4()), u"title":u"Ensemble Twitter", u"author":u"IRI Web", u"abstract":u"Ensemble Twitter pour ENMI 2009"}) - decoupage = etree.SubElement(ensemble, u"decoupage", {u"id": unicode(uuid.uuid4()), u"author": u"IRI Web"}) - - etree.SubElement(decoupage, u"title").text = unicode(options.name) - etree.SubElement(decoupage, u"abstract").text = unicode(options.name) - - elements = etree.SubElement(decoupage, u"elements") - - for tw in query_res: - tweet_ts_dt = tw.created_at - tweet_ts = int(time.mktime(tweet_ts_dt.timetuple())) - tweet_ts_rel = (tweet_ts-ts) * 1000 - username = None - if tw.user is not None: - username = tw.user.name - if not username: - username = "anon." - element = etree.SubElement(elements, u"element" , {u"id":unicode(uuid.uuid4())+u"-"+unicode(tw.id), u"color":unicode(options.color), u"author":unicode(username), u"date":unicode(tweet_ts_dt.strftime("%Y/%m/%d")), u"begin": unicode(tweet_ts_rel), u"dur":u"0", u"src":u""}) - etree.SubElement(element, u"title").text = unicode(username) + u": " + unicode(tw.text) - etree.SubElement(element, u"abstract").text = unicode(tw.text) - - tags_node = etree.SubElement(element, u"tags") - - for entity in tw.entity_list: - if entity.type == u'entity_hashtag': - etree.SubElement(tags_node,u"tag").text = entity.hashtag.text - - if content_file and os.path.exists(content_file): - output = open(content_file, "w") - else: - output = open(options.filename, "w") - - output.write(etree.tostring(root, encoding="utf-8", method="xml", pretty_print=True, xml_declaration=True)) - output.flush() - output.close() - - finally: - session.close() - finally: - conn.close() diff -r 54d7f1486ac4 -r 4daf47fcf792 script/iri_tweet/models.py --- a/script/iri_tweet/models.py Tue Jan 18 10:08:03 2011 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,314 +0,0 @@ -from sqlalchemy import Boolean, Table, Column, BigInteger, Integer, String, \ - MetaData, ForeignKey, DateTime, create_engine -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import relationship, backref -import datetime -import email.utils -import simplejson - - -Base = declarative_base() - -APPLICATION_NAME = "IRI_TWITTER" -CONSUMER_KEY = "54ThDZhpEjokcMgHJOMnQA" -CONSUMER_SECRET = "wUoL9UL2T87tfc97R0Dff2EaqRzpJ5XGdmaN2XK3udA" -#ACCESS_TOKEN_KEY= "47312923-LiNTtz0I18YXMVIrFeTuhmH7bOvYsK6p3Ln2Dc" -#ACCESS_TOKEN_SECRET = "r3LoXVcjImNAElUpWqTu2SG2xCdWFHkva7xeQoncA" - -def adapt_date(date_str): - ts = email.utils.parsedate_tz(date_str) - return datetime.datetime(*ts[0:7]) - -def adapt_json(obj): - if obj is None: - return None - else: - return simplejson.dumps(obj) - -class Entity(Base): - __tablename__ = "tweet_entity" - id = Column(Integer, primary_key = True) - tweet_id = Column(BigInteger, ForeignKey('tweet_tweet.id')) - #tweet = relationship(Tweet, primaryjoin = tweet_id == Tweet.id) - type = Column(String) - indice_start = Column(Integer) - indice_end = Column(Integer) - __mapper_args__ = {'polymorphic_on': type} - - def __init__(self, **kwargs): - for key, value in kwargs.items(): - if hasattr(self,key): - setattr(self,key,value) - - -class Tweet(Base): - __tablename__ = 'tweet_tweet' - - id = Column(BigInteger, primary_key=True, autoincrement=False) - id_str = Column(String) - contributors = Column(String) - coordinates = Column(String) - created_at = Column(DateTime) - favorited = Column(Boolean) - geo = Column(String) - in_reply_to_screen_name = Column(String) - in_reply_to_status_id = Column(BigInteger) - in_reply_to_status_id_str = Column(String) - in_reply_to_user_id = Column(Integer) - in_reply_to_user_id_str = Column(String) - place = Column(String) - retweet_count = Column(Integer) - retweeted = Column(Boolean) - source = Column(String) - text = Column(String) - truncated = Column(Boolean) - user_id = Column(Integer, ForeignKey('tweet_user.id')) - original_json = Column(String) - entity_list = relationship(Entity, backref='tweet') - - #user = relationship(User, primaryjoin=user_id == User.id) - - def __init__(self, **kwargs): - for key, value in kwargs.items(): - if hasattr(self,key): - setattr(self,key,value) - - -class User(Base): - __tablename__ = "tweet_user" - - id = Column(Integer, primary_key = True, autoincrement=False) - id_str= Column(String) - contributors_enabled= Column(Boolean) - created_at= Column(DateTime) - description= Column(String) - favourites_count = Column(Integer) - follow_request_sent = Column(Boolean) - followers_count = Column(Integer) - following = Column(String) - friends_count = Column(Integer) - geo_enabled= Column(Boolean) - is_translator= Column(Boolean) - lang = Column(String) - listed_count = Column(Integer) - location= Column(String) - name = Column(String) - notifications = Column(String) - profile_background_color= Column(String) - profile_background_image_url= Column(String) - profile_background_tile= Column(Boolean) - profile_image_url= Column(String) - profile_link_color= Column(String) - profile_sidebar_border_color= Column(String) - profile_sidebar_fill_color= Column(String) - profile_text_color= Column(String) - profile_use_background_image= Column(Boolean) - protected= Column(Boolean) - screen_name= Column(String) - show_all_inline_media= Column(Boolean) - statuses_count = Column(Integer) - time_zone= Column(String) - url= Column(String) - utc_offset = Column(Integer) - verified= Column(Boolean) - tweets = relationship(Tweet, backref='user') - - def __init__(self, **kwargs): - for key, value in kwargs.items(): - if hasattr(self,key): - setattr(self,key,value) - - - -class Hashtag(Base): - __tablename__ = "tweet_hashtag" - id = Column(Integer, primary_key=True) - text = Column(String, unique = True) - def __init__(self, **kwargs): - for key, value in kwargs.items(): - if hasattr(self,key): - setattr(self,key,value) - - - -class Url(Base): - __tablename__ = "tweet_url" - id = Column(Integer, primary_key=True) - url = Column(String, unique=True) - expanded_url = Column(String) - def __init__(self, **kwargs): - for key, value in kwargs.items(): - if hasattr(self,key): - setattr(self,key,value) - - - -class EntityHashtag(Entity): - __tablename__ = "tweet_entity_hashtag" - __mapper_args__ = {'polymorphic_identity': 'entity_hashtag'} - id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True) - hashtag_id = Column(Integer, ForeignKey("tweet_hashtag.id")) - hashtag = relationship(Hashtag, primaryjoin=hashtag_id == Hashtag.id) - def __init__(self, **kwargs): - super(EntityHashtag, self).__init__(**kwargs) - for key, value in kwargs.items(): - if hasattr(self,key): - setattr(self,key,value) - - -class EntityUrl(Entity): - __tablename__ = "tweet_entity_url" - __mapper_args__ = {'polymorphic_identity': 'entity_url'} - id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True) - url_id = Column(Integer, ForeignKey("tweet_url.id")) - url = relationship(Url, primaryjoin=url_id == Url.id) - def __init__(self, **kwargs): - super(EntityUrl, self).__init__(**kwargs) - for key, value in kwargs.items(): - if hasattr(self,key): - setattr(self,key,value) - -class EntityUser(Entity): - __tablename__ = "tweet_entity_user" - __mapper_args__ = {'polymorphic_identity': 'entity_user'} - id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True) - user_id = Column(Integer, ForeignKey('tweet_user.id')) - user = relationship(User, primaryjoin=user_id == User.id) - - def __init__(self, **kwargs): - super(EntityUser, self).__init__(**kwargs) - for key, value in kwargs.items(): - if hasattr(self,key): - setattr(self,key,value) - - -def setup_database(*args, **kwargs): - - create_all = True - if "create_all" in kwargs: - create_all = kwargs["create_all"] - del(kwargs["create_all"]) - - engine = create_engine(*args, **kwargs) - metadata = Base.metadata - - if create_all: - metadata.create_all(engine) - - return (engine, metadata) - -rest_tweet_tweet = { - u'iso_language_code': 'unicode', - u'text': 'unicode', - u'from_user_id_str': 'unicode', - u'profile_image_url': 'unicode', - u'to_user_id_str': 'NoneType', - u'created_at': 'unicode', - u'source': 'unicode', - u'to_user': 'unicode', - u'id_str': 'unicode', - u'from_user': 'unicode', - u'place': {u'type': 'unicode', u'id': 'unicode', u'full_name': 'unicode'}, - u'from_user_id': 'int', - u'to_user_id': 'NoneType', - u'geo': 'NoneType', - u'id': 'int', - u'metadata': {u'result_type': 'unicode'} -} - -tweet_tweet = { - 'contributors': None, - 'coordinates': None, - 'created_at': 'date', - 'entities': "tweet_entity", - 'favorited': "bool", - 'geo': None, - 'id': "long", - 'id_str': "string", - 'in_reply_to_screen_name': "string", - 'in_reply_to_status_id': "long", - 'in_reply_to_status_id_str': "string", - 'in_reply_to_user_id': "int", - 'in_reply_to_user_id_str': "string", - 'place': "string", - 'retweet_count': "int", - 'retweeted': "bool", - 'source': "string", - 'text': "string", - 'truncated': "bool", - 'user': "tweet_user" -} -tweet_user = { - 'contributors_enabled': 'bool', - 'created_at': 'str', - 'description': 'str', - 'favourites_count': 'int', - 'follow_request_sent': None, - 'followers_count': 'int', - 'following': None, - 'friends_count': 'int', - 'geo_enabled': 'bool', - 'id': 'int', - 'id_str': 'str', - 'is_translator': 'bool', - 'lang': 'str', - 'listed_count': 'int', - 'location': 'str', - 'name': 'str', - 'notifications': 'NoneType', - 'profile_background_color': 'str', - 'profile_background_image_url': 'str', - 'profile_background_tile': 'bool', - 'profile_image_url': 'str', - 'profile_link_color': 'str', - 'profile_sidebar_border_color': 'str', - 'profile_sidebar_fill_color': 'str', - 'profile_text_color': 'str', - 'profile_use_background_image': 'bool', - 'protected': 'bool', - 'screen_name': 'str', - 'show_all_inline_media': 'bool', - 'statuses_count': 'int', - 'time_zone': 'str', - 'url': 'str', - 'utc_offset': 'int', - 'verified': 'bool', -} - - -tweet_entity_hashtag = { - 'hashtag' : 'tweet_hashtag', - 'indice_start' : 'int', - 'indice_end' : 'int', - 'tweet':'tweet_tweet' -} - -tweet_entity_url = { - 'url' : 'tweet_url', - 'indice_start' : 'int', - 'indice_end' : 'int', - 'tweet':'tweet_tweet' -} - -tweet_entity_user = { - 'user' : 'tweet_user', - 'indice_start' : 'int', - 'indice_end' : 'int', - 'tweet':'tweet_tweet' -} - -#id int -#id_str str -#indices list -#name str -#screen_name str - -tweet_hashtag = { - "text": "string" -} - -tweet_url = { - "url": "string", - "expanded_url" : "string", -} - diff -r 54d7f1486ac4 -r 4daf47fcf792 script/iri_tweet/utils.py --- a/script/iri_tweet/utils.py Tue Jan 18 10:08:03 2011 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,310 +0,0 @@ -from models import * -import datetime -import email.utils -import json -import logging -import sys -import twitter -import twitter_text -import os.path -import twitter.oauth - - -def get_oauth_token(token_file_path=None): - - if token_file_path and os.path.file_exists(token_file_path): - logging.debug("reading token from file %s" % token_file_path) - return twitter.oauth.read_token_file(token_file_path) - #read access token info from path - - if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET: - return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET - - return twitter.oauth_dance.oauth_dance(APPLICATION_NAME, CONSUMER_KEY, CONSUMER_SECRET, token_filename) - -def parse_date(date_str): - ts = email.utils.parsedate_tz(date_str) - return datetime.datetime(*ts[0:7]) - - -fields_adapter = { - 'stream': { - "tweet": { - "created_at" : adapt_date, - "coordinates" : adapt_json, - "place" : adapt_json, - "geo" : adapt_json, -# "original_json" : adapt_json, - }, - "user": { - "created_at" : adapt_date, - }, - }, - 'rest': { - "tweet" : { - "place" : adapt_json, - "geo" : adapt_json, - "created_at" : adapt_date, -# "original_json" : adapt_json, - }, - }, -} - -# -# adapt fields, return a copy of the field_dict with adapted fields -# -def adapt_fields(fields_dict, adapter_mapping): - def adapt_one_field(field, value): - if field in adapter_mapping and adapter_mapping[field] is not None: - return adapter_mapping[field](value) - else: - return value - return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) - - - -class TwitterProcessorException(Exception): - pass - -class TwitterProcessor(object): - - def __init__(self, json_dict, json_txt, session): - - if json_dict is None and json_txt is None: - raise TwitterProcessorException("No json") - - if json_dict is None: - self.json_dict = json.loads(json_txt) - else: - self.json_dict = json_dict - - if not json_txt: - self.json_txt = json.dumps(json_dict) - else: - self.json_txt = json_txt - - if "id" not in self.json_dict: - raise TwitterProcessorException("No id in json") - - self.session = session - - def __get_user(self, user_dict): - logging.debug("Get user : " + repr(user_dict)) - - user_id = user_dict.get("id",None) - user_name = user_dict.get("screen_name", user_dict.get("name", None)) - - if user_id is None and user_name is None: - return None - - if user_id: - user = self.session.query(User).filter(User.id == user_id).first() - else: - user = self.session.query(User).filter(User.screen_name == user_name).first() - - if user is not None: - return user - - user_created_at = user_dict.get("created_at", None) - - if user_created_at is None: - acess_token_key, access_token_secret = get_oauth_token() - t = twitter.Twitter(auth=twitter.OAuth(token_key, token_secret, CONSUMER_KEY, CONSUMER_SECRET)) - try: - if user_id: - user_dict = t.users.show(user_id=user_id) - else: - user_dict = t.users.show(screen_name=user_name) - except Exception as e: - logging.info("get_user : TWITTER ERROR : " + repr(e)) - logging.info("get_user : TWITTER ERROR : " + str(e)) - - user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) - if "id" not in user_dict: - return None - - user = User(**user_dict) - - self.session.add(user) - self.session.flush() - - return user - - def __process_entity(self, ind, ind_type): - logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) - - entity_dict = { - "indice_start": ind["indices"][0], - "indice_end" : ind["indices"][1], - "tweet_id" : self.tweet.id, - "tweet" : self.tweet - } - - def process_hashtags(): - text = ind.get("text", ind.get("hashtag", None)) - if text is None: - return None - hashtag = self.session.query(Hashtag).filter(Hashtag.text == text).first() - if not hashtag: - ind["text"] = text - hashtag = Hashtag(**ind) - self.session.add(hashtag) - self.session.flush() - entity_dict['hashtag'] = hashtag - entity_dict['hashtag_id'] = hashtag.id - entity = EntityHashtag(**entity_dict) - return entity - - def process_user_mentions(): - user_mention = self.__get_user(ind) - if user_mention is None: - entity_dict['user'] = None - entity_dict['user_id'] = None - else: - entity_dict['user'] = user_mention - entity_dict['user_id'] = user_mention.id - entity = EntityUser(**entity_dict) - return entity - - def process_urls(): - url = self.session.query(Url).filter(Url.url == ind["url"]).first() - if url is None: - url = Url(**ind) - self.session.add(url) - self.session.flush() - entity_dict['url'] = url - entity_dict['url_id'] = url.id - entity = EntityUrl(**entity_dict) - return entity - - #{'': lambda } - entity = { - 'hashtags': process_hashtags, - 'user_mentions' : process_user_mentions, - 'urls' : process_urls - }[ind_type]() - - logging.debug("Process_entity entity_dict: " + repr(entity_dict)) - if entity: - self.session.add(entity) - self.session.flush() - - - def __process_twitter_stream(self): - - tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() - if tweet_nb > 0: - return - - ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"]) - - # get or create user - user = self.__get_user(self.json_dict["user"]) - if user is None: - log.warning("USER not found " + repr(ts["user"])) - ts_copy["user"] = None - ts_copy["user_id"] = None - else: - ts_copy["user"] = user - ts_copy["user_id"] = ts_copy["user"].id - ts_copy["original_json"] = self.json_txt - - self.tweet = Tweet(**ts_copy) - self.session.add(self.tweet) - self.session.flush() - - # get entities - for ind_type, entity_list in self.json_dict["entities"].items(): - for ind in entity_list: - self.__process_entity(ind, ind_type) - - - def __process_twitter_rest(self): - tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() - if tweet_nb > 0: - return - - tweet_fields = { - 'created_at': self.json_dict["created_at"], - 'favorited': False, - 'id': self.json_dict["id"], - 'id_str': self.json_dict["id_str"], - #'in_reply_to_screen_name': ts["to_user"], - 'in_reply_to_user_id': self.json_dict["to_user_id"], - 'in_reply_to_user_id_str': self.json_dict["to_user_id_str"], - #'place': ts["place"], - 'source': self.json_dict["source"], - 'text': self.json_dict["text"], - 'truncated': False, - 'original_json' : self.json_txt, - } - - #user - - user_fields = { - 'id' : self.json_dict['from_user_id'], - 'id_str' : self.json_dict['from_user_id_str'], - 'lang' : self.json_dict['iso_language_code'], - 'profile_image_url' : self.json_dict["profile_image_url"], - 'screen_name' : self.json_dict["from_user"], - } - - user = self.__get_user(user_fields) - if user is None: - log.warning("USER not found " + repr(user_fields)) - tweet_fields["user"] = None - tweet_fields["user_id"] = None - else: - tweet_fields["user"] = user - tweet_fields["user_id"] = user.id - - tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) - self.tweet = Tweet(**tweet_fields) - session.add(self.tweet) - - text = self.tweet.text - - extractor = twitter_text.Extractor(text) - - for ind in extractor.extract_hashtags_with_indices(): - self.__process_entity(ind, "hashtags") - - for ind in extractor.extract_mentioned_screen_names_with_indices(): - self.__process_entity(ind, "user_mentions") - - for ind in extractor.extract_urls_with_indices(): - self.__process_entity(ind, "urls") - - self.session.flush() - - - def process(self): - if "metadata" in self.json_dict: - self.__process_twitter_rest() - else: - self.__process_twitter_stream() - - -def set_logging(options): - - logging_config = {} - - if options.logfile == "stdout": - logging_config["stream"] = sys.stdout - elif options.logfile == "stderr": - logging_config["stream"] = sys.stderr - else: - logging_config["filename"] = options.logfile - - logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) - logging.basicConfig(**logging_config) - - options.debug = (options.verbose-options.quiet > 0) - -def set_logging_options(parser): - parser.add_option("-l", "--log", dest="logfile", - help="log to file", metavar="LOG", default="stderr") - parser.add_option("-v", dest="verbose", action="count", - help="verbose", metavar="VERBOSE", default=0) - parser.add_option("-q", dest="quiet", action="count", - help="quiet", metavar="QUIET", default=0) diff -r 54d7f1486ac4 -r 4daf47fcf792 script/lib/iri_tweet/__init__.py diff -r 54d7f1486ac4 -r 4daf47fcf792 script/lib/iri_tweet/create_twitter_export_conf.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/iri_tweet/create_twitter_export_conf.py Tue Jan 18 18:25:18 2011 +0100 @@ -0,0 +1,43 @@ +from lxml import etree +from optparse import OptionParser + +def get_options(): + + parser = OptionParser() + + parser.add_option("-f", "--file", dest="outputfile", + help="destination filename", metavar="FILE", default="twitter_export_conf.xml") + parser.add_option("-i", "--input", dest="inputfile", + help="inputfile", metavar="INPUT", default=None) + + return parser.parse_args() + +if __name__ == "__main__": + (options, args) = get_options() + + dest_filename = options.outputfile + + path_list = [] + if options.inputfile is None: + path_list = args + else: + with open(options.inputfile, 'r') as fi: + path_list = fi + + + root = etree.Element("twitter_export") + + + for path in path_list: + + iri_doc = etree.parse(path) + media_nodes = iri_doc.xpath("/iri/body/medias/media[@id='video']/video") + duration = int(media_nodes[0].get("dur"))/1000 + + file_elem = etree.SubElement(root, "file") + etree.SubElement(file_elem, "path").text = path + etree.SubElement(file_elem, "start_date") + etree.SubElement(file_elem, "duration").text = unicode(duration) + + tree = etree.ElementTree(root) + tree.write(dest_filename, encoding="utf-8", pretty_print=True, xml_declaration=True) \ No newline at end of file diff -r 54d7f1486ac4 -r 4daf47fcf792 script/lib/iri_tweet/export_tweet_db.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/iri_tweet/export_tweet_db.py Tue Jan 18 18:25:18 2011 +0100 @@ -0,0 +1,47 @@ +from models import * +from utils import * +from optparse import OptionParser +from sqlalchemy.orm import sessionmaker +import logging +import sqlite3 +import sys + + +# 'entities': "tweet_entity", +# 'user': "tweet_user" + +def get_option(): + + parser = OptionParser() + + set_logging_options(parser) + + return parser.parse_args() + +if __name__ == "__main__": + + (options, args) = get_option() + + set_logging(options) + + with sqlite3.connect(args[0]) as conn_in: + engine, metadata = setup_database('sqlite:///'+args[1], echo=((options.verbose-options.quiet)>0)) + Session = sessionmaker(bind=engine) + session = Session() + try: + curs_in = conn_in.cursor() + fields_mapping = {} + for i,res in enumerate(curs_in.execute("select json from tweet_tweet;")): + logging.debug("main loop %d : %s" % (i, res[0])) + processor = TwitterProcessor(eval(res[0]), res[0], session) + processor.process() + session.commit() + logging.debug("main : %d tweet processed" % (i+1)) + except Exception, e: + session.rollback() + raise e + finally: + session.close() + + + \ No newline at end of file diff -r 54d7f1486ac4 -r 4daf47fcf792 script/lib/iri_tweet/export_twitter_alchemy.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/iri_tweet/export_twitter_alchemy.py Tue Jan 18 18:25:18 2011 +0100 @@ -0,0 +1,216 @@ +#!/usr/bin/env python +# coding=utf-8 + +from lxml import etree +from models import * +from optparse import OptionParser +from sqlalchemy import Table, Column, Integer, BigInteger, String, MetaData, \ + ForeignKey +from sqlalchemy.orm import sessionmaker, mapper +from sqlalchemy.sql import select +import datetime +import email.utils +import logging +import os +import os.path +import re +import sys +import time +import uuid + +#class TweetExclude(object): +# def __init__(self, id): +# self.id = id +# +# def __repr__(self): +# return "" % (self.id) + +def parse_date(date_str): + ts = email.utils.parsedate_tz(date_str) + return datetime.datetime(*ts[0:7]) + +def get_options(): + parser = OptionParser() + parser.add_option("-f", "--file", dest="filename", + help="write export to file", metavar="FILE", default="project_enmi.ldt") + parser.add_option("-d", "--database", dest="database", + help="Input database", metavar="DATABASE") + parser.add_option("-s", "--start-date", dest="start_date", + help="start date", metavar="START_DATE") + parser.add_option("-e", "--end-date", dest="end_date", + help="end date", metavar="END_DATE") + parser.add_option("-I", "--content-file", dest="content_file", + help="Content file", metavar="CONTENT_FILE") + parser.add_option("-c", "--content", dest="content", + help="Content url", metavar="CONTENT") + parser.add_option("-V", "--video-url", dest="video", + help="video url", metavar="VIDEO") + parser.add_option("-i", "--content-id", dest="content_id", + help="Content id", metavar="CONTENT_ID") + parser.add_option("-x", "--exclude", dest="exclude", + help="file containing the id to exclude", metavar="EXCLUDE") + parser.add_option("-C", "--color", dest="color", + help="Color code", metavar="COLOR", default="16763904") + parser.add_option("-H", "--hashtag", dest="hashtag", + help="Hashtag", metavar="HASHTAG", default="enmi") + parser.add_option("-D", "--duration", dest="duration", type="int", + help="Duration", metavar="DURATION", default=None) + parser.add_option("-n", "--name", dest="name", + help="Cutting name", metavar="NAME", default=u"Tweets") + parser.add_option("-R", "--replace", dest="replace", action="store_true", + help="Replace tweet ensemble", metavar="REPLACE", default=False) + parser.add_option("-l", "--log", dest="logfile", + help="log to file", metavar="LOG", default="stderr") + + set_logging_options(parser) + + + return parser.parse_args() + + +if __name__ == "__main__" : + + (options, args) = get_options() + + set_logging(options) + + logging.debug("OPTIONS : " + repr(options)) + + engine, metadata = setup_database('sqlite:///'+options.database, echo=((options.verbose-options.quiet)>0), create_all = False) + + Session = sessionmaker() + conn = engine.connect() + try : + session = Session(bind=conn) + try : + + metadata = MetaData(bind=conn) + tweet_exclude_table = Table("tweet_exclude", metadata, Column('id', BigInteger, primary_key=True), prefixes=['TEMPORARY']) + #mapper(TweetExclude, tweet_exclude_table) + metadata.create_all() + + if options.exclude and os.path.exists(options.exclude): + with open(options.exclude, 'r+') as f: + tei = tweet_exclude_table.insert() + for line in f: + conn.execute(tei.values(id=long(line.strip()))) + + if options.listconf: + + parameters = [] + confdoc = etree.parse(options.listconf) + for node in confdoc.xpath("/twitter_export/file"): + params = {} + for snode in node: + if snode.tag == "path": + params['content_file'] = snode.text + elif snode.tag == "start_date": + params['start_date'] = snode.text + elif snode.tag == "end_date": + params['end_date'] = snode.text + elif snode.tag == "duration": + params['duration'] = int(snode.text) + parameters.append(params) + else: + parameters = [{ + 'start_date': options.start_date, + 'end_date' : options.end_date, + 'duration' : options.duration, + 'content_file' : otions.content_file + + }] + + for params in parameters: + + logging.debug("PARAMETERS " + repr(params)) + + start_date_str = params.get("start_date",None) + end_date_str = params.get("end_date", None) + duration = params.get("duration", None) + content_file = params.get("content_file", None) + + + start_date = parse_date(start_date_str) + ts = time.mktime(start_date.timetuple()) + + if end_date_str: + end_date = parse_date(end_date_str) + te = time.mktime(end_date.timetuple()) + else: + te = ts + duration + end_date = start_date + datetime.timedelta(seconds=duration) + + + query_res = session.query(Tweet).join(EntityHashtag).join(Hashtag).filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))).filter(Hashtag.text.contains(options.hashtag)).filter(Tweet.created_at >= start_date).filter(Tweet.created_at <= end_date).all() + + #hashtag = u"%#"+unicode(options.hashtag)+u"%" + + #cursor.execute("select tt.id, tt.text, tt.created_at_ts, tu.name, tu.screen_name from tweet_tweet as tt join tweet_user as tu on tt.user = tu.rowid where text like ? and tt.created_at_ts >= ? and tt.created_at_ts <= ? and tt.id not in (select id from tweet_exclude) order by tt.created_at_ts asc;", (hashtag,ts,te)); + + root = None + ensemble_parent = None + + if content_file and os.path.exists(content_file): + + doc = etree.parse(content_file) + root = doc.getroot() + + ensemble_parent = root.xpath("//ensembles")[0] + + else: + root = etree.Element(u"iri") + + project = etree.SubElement(root, u"project", {u"abstract":u"Twitter comments on ENMI",u"title":u"Twitter comments on ENMI 2009", u"user":u"IRI Web", u"id":unicode(uuid.uuid4())}) + + medias = etree.SubElement(root, u"medias") + media = etree.SubElement(medias, u"media", {u"pict":u"", u"src":unicode(options.content), u"video":unicode(options.video), u"id":unicode(options.content_id), u"extra":u""}) + + annotations = etree.SubElement(root, u"annotations") + content = etree.SubElement(annotations, u"content", {u"id":unicode(options.content_id)}) + ensemble_parent = content + + if options.replace: + for ens in ensemble_parent.iterchildren(tag=u"ensemble"): + if ens.get("id","").startswith("tweet_"): + ensemble_parent.remove(ens) + + ensemble = etree.SubElement(ensemble_parent, u"ensemble", {u"id":u"tweet_" + unicode(uuid.uuid4()), u"title":u"Ensemble Twitter", u"author":u"IRI Web", u"abstract":u"Ensemble Twitter pour ENMI 2009"}) + decoupage = etree.SubElement(ensemble, u"decoupage", {u"id": unicode(uuid.uuid4()), u"author": u"IRI Web"}) + + etree.SubElement(decoupage, u"title").text = unicode(options.name) + etree.SubElement(decoupage, u"abstract").text = unicode(options.name) + + elements = etree.SubElement(decoupage, u"elements") + + for tw in query_res: + tweet_ts_dt = tw.created_at + tweet_ts = int(time.mktime(tweet_ts_dt.timetuple())) + tweet_ts_rel = (tweet_ts-ts) * 1000 + username = None + if tw.user is not None: + username = tw.user.name + if not username: + username = "anon." + element = etree.SubElement(elements, u"element" , {u"id":unicode(uuid.uuid4())+u"-"+unicode(tw.id), u"color":unicode(options.color), u"author":unicode(username), u"date":unicode(tweet_ts_dt.strftime("%Y/%m/%d")), u"begin": unicode(tweet_ts_rel), u"dur":u"0", u"src":u""}) + etree.SubElement(element, u"title").text = unicode(username) + u": " + unicode(tw.text) + etree.SubElement(element, u"abstract").text = unicode(tw.text) + + tags_node = etree.SubElement(element, u"tags") + + for entity in tw.entity_list: + if entity.type == u'entity_hashtag': + etree.SubElement(tags_node,u"tag").text = entity.hashtag.text + + if content_file and os.path.exists(content_file): + output = open(content_file, "w") + else: + output = open(options.filename, "w") + + output.write(etree.tostring(root, encoding="utf-8", method="xml", pretty_print=True, xml_declaration=True)) + output.flush() + output.close() + + finally: + session.close() + finally: + conn.close() diff -r 54d7f1486ac4 -r 4daf47fcf792 script/lib/iri_tweet/models.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/iri_tweet/models.py Tue Jan 18 18:25:18 2011 +0100 @@ -0,0 +1,314 @@ +from sqlalchemy import Boolean, Table, Column, BigInteger, Integer, String, \ + MetaData, ForeignKey, DateTime, create_engine +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relationship, backref +import datetime +import email.utils +import simplejson + + +Base = declarative_base() + +APPLICATION_NAME = "IRI_TWITTER" +CONSUMER_KEY = "54ThDZhpEjokcMgHJOMnQA" +CONSUMER_SECRET = "wUoL9UL2T87tfc97R0Dff2EaqRzpJ5XGdmaN2XK3udA" +#ACCESS_TOKEN_KEY= "47312923-LiNTtz0I18YXMVIrFeTuhmH7bOvYsK6p3Ln2Dc" +#ACCESS_TOKEN_SECRET = "r3LoXVcjImNAElUpWqTu2SG2xCdWFHkva7xeQoncA" + +def adapt_date(date_str): + ts = email.utils.parsedate_tz(date_str) + return datetime.datetime(*ts[0:7]) + +def adapt_json(obj): + if obj is None: + return None + else: + return simplejson.dumps(obj) + +class Entity(Base): + __tablename__ = "tweet_entity" + id = Column(Integer, primary_key = True) + tweet_id = Column(BigInteger, ForeignKey('tweet_tweet.id')) + #tweet = relationship(Tweet, primaryjoin = tweet_id == Tweet.id) + type = Column(String) + indice_start = Column(Integer) + indice_end = Column(Integer) + __mapper_args__ = {'polymorphic_on': type} + + def __init__(self, **kwargs): + for key, value in kwargs.items(): + if hasattr(self,key): + setattr(self,key,value) + + +class Tweet(Base): + __tablename__ = 'tweet_tweet' + + id = Column(BigInteger, primary_key=True, autoincrement=False) + id_str = Column(String) + contributors = Column(String) + coordinates = Column(String) + created_at = Column(DateTime) + favorited = Column(Boolean) + geo = Column(String) + in_reply_to_screen_name = Column(String) + in_reply_to_status_id = Column(BigInteger) + in_reply_to_status_id_str = Column(String) + in_reply_to_user_id = Column(Integer) + in_reply_to_user_id_str = Column(String) + place = Column(String) + retweet_count = Column(Integer) + retweeted = Column(Boolean) + source = Column(String) + text = Column(String) + truncated = Column(Boolean) + user_id = Column(Integer, ForeignKey('tweet_user.id')) + original_json = Column(String) + entity_list = relationship(Entity, backref='tweet') + + #user = relationship(User, primaryjoin=user_id == User.id) + + def __init__(self, **kwargs): + for key, value in kwargs.items(): + if hasattr(self,key): + setattr(self,key,value) + + +class User(Base): + __tablename__ = "tweet_user" + + id = Column(Integer, primary_key = True, autoincrement=False) + id_str= Column(String) + contributors_enabled= Column(Boolean) + created_at= Column(DateTime) + description= Column(String) + favourites_count = Column(Integer) + follow_request_sent = Column(Boolean) + followers_count = Column(Integer) + following = Column(String) + friends_count = Column(Integer) + geo_enabled= Column(Boolean) + is_translator= Column(Boolean) + lang = Column(String) + listed_count = Column(Integer) + location= Column(String) + name = Column(String) + notifications = Column(String) + profile_background_color= Column(String) + profile_background_image_url= Column(String) + profile_background_tile= Column(Boolean) + profile_image_url= Column(String) + profile_link_color= Column(String) + profile_sidebar_border_color= Column(String) + profile_sidebar_fill_color= Column(String) + profile_text_color= Column(String) + profile_use_background_image= Column(Boolean) + protected= Column(Boolean) + screen_name= Column(String) + show_all_inline_media= Column(Boolean) + statuses_count = Column(Integer) + time_zone= Column(String) + url= Column(String) + utc_offset = Column(Integer) + verified= Column(Boolean) + tweets = relationship(Tweet, backref='user') + + def __init__(self, **kwargs): + for key, value in kwargs.items(): + if hasattr(self,key): + setattr(self,key,value) + + + +class Hashtag(Base): + __tablename__ = "tweet_hashtag" + id = Column(Integer, primary_key=True) + text = Column(String, unique = True) + def __init__(self, **kwargs): + for key, value in kwargs.items(): + if hasattr(self,key): + setattr(self,key,value) + + + +class Url(Base): + __tablename__ = "tweet_url" + id = Column(Integer, primary_key=True) + url = Column(String, unique=True) + expanded_url = Column(String) + def __init__(self, **kwargs): + for key, value in kwargs.items(): + if hasattr(self,key): + setattr(self,key,value) + + + +class EntityHashtag(Entity): + __tablename__ = "tweet_entity_hashtag" + __mapper_args__ = {'polymorphic_identity': 'entity_hashtag'} + id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True) + hashtag_id = Column(Integer, ForeignKey("tweet_hashtag.id")) + hashtag = relationship(Hashtag, primaryjoin=hashtag_id == Hashtag.id) + def __init__(self, **kwargs): + super(EntityHashtag, self).__init__(**kwargs) + for key, value in kwargs.items(): + if hasattr(self,key): + setattr(self,key,value) + + +class EntityUrl(Entity): + __tablename__ = "tweet_entity_url" + __mapper_args__ = {'polymorphic_identity': 'entity_url'} + id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True) + url_id = Column(Integer, ForeignKey("tweet_url.id")) + url = relationship(Url, primaryjoin=url_id == Url.id) + def __init__(self, **kwargs): + super(EntityUrl, self).__init__(**kwargs) + for key, value in kwargs.items(): + if hasattr(self,key): + setattr(self,key,value) + +class EntityUser(Entity): + __tablename__ = "tweet_entity_user" + __mapper_args__ = {'polymorphic_identity': 'entity_user'} + id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True) + user_id = Column(Integer, ForeignKey('tweet_user.id')) + user = relationship(User, primaryjoin=user_id == User.id) + + def __init__(self, **kwargs): + super(EntityUser, self).__init__(**kwargs) + for key, value in kwargs.items(): + if hasattr(self,key): + setattr(self,key,value) + + +def setup_database(*args, **kwargs): + + create_all = True + if "create_all" in kwargs: + create_all = kwargs["create_all"] + del(kwargs["create_all"]) + + engine = create_engine(*args, **kwargs) + metadata = Base.metadata + + if create_all: + metadata.create_all(engine) + + return (engine, metadata) + +rest_tweet_tweet = { + u'iso_language_code': 'unicode', + u'text': 'unicode', + u'from_user_id_str': 'unicode', + u'profile_image_url': 'unicode', + u'to_user_id_str': 'NoneType', + u'created_at': 'unicode', + u'source': 'unicode', + u'to_user': 'unicode', + u'id_str': 'unicode', + u'from_user': 'unicode', + u'place': {u'type': 'unicode', u'id': 'unicode', u'full_name': 'unicode'}, + u'from_user_id': 'int', + u'to_user_id': 'NoneType', + u'geo': 'NoneType', + u'id': 'int', + u'metadata': {u'result_type': 'unicode'} +} + +tweet_tweet = { + 'contributors': None, + 'coordinates': None, + 'created_at': 'date', + 'entities': "tweet_entity", + 'favorited': "bool", + 'geo': None, + 'id': "long", + 'id_str': "string", + 'in_reply_to_screen_name': "string", + 'in_reply_to_status_id': "long", + 'in_reply_to_status_id_str': "string", + 'in_reply_to_user_id': "int", + 'in_reply_to_user_id_str': "string", + 'place': "string", + 'retweet_count': "int", + 'retweeted': "bool", + 'source': "string", + 'text': "string", + 'truncated': "bool", + 'user': "tweet_user" +} +tweet_user = { + 'contributors_enabled': 'bool', + 'created_at': 'str', + 'description': 'str', + 'favourites_count': 'int', + 'follow_request_sent': None, + 'followers_count': 'int', + 'following': None, + 'friends_count': 'int', + 'geo_enabled': 'bool', + 'id': 'int', + 'id_str': 'str', + 'is_translator': 'bool', + 'lang': 'str', + 'listed_count': 'int', + 'location': 'str', + 'name': 'str', + 'notifications': 'NoneType', + 'profile_background_color': 'str', + 'profile_background_image_url': 'str', + 'profile_background_tile': 'bool', + 'profile_image_url': 'str', + 'profile_link_color': 'str', + 'profile_sidebar_border_color': 'str', + 'profile_sidebar_fill_color': 'str', + 'profile_text_color': 'str', + 'profile_use_background_image': 'bool', + 'protected': 'bool', + 'screen_name': 'str', + 'show_all_inline_media': 'bool', + 'statuses_count': 'int', + 'time_zone': 'str', + 'url': 'str', + 'utc_offset': 'int', + 'verified': 'bool', +} + + +tweet_entity_hashtag = { + 'hashtag' : 'tweet_hashtag', + 'indice_start' : 'int', + 'indice_end' : 'int', + 'tweet':'tweet_tweet' +} + +tweet_entity_url = { + 'url' : 'tweet_url', + 'indice_start' : 'int', + 'indice_end' : 'int', + 'tweet':'tweet_tweet' +} + +tweet_entity_user = { + 'user' : 'tweet_user', + 'indice_start' : 'int', + 'indice_end' : 'int', + 'tweet':'tweet_tweet' +} + +#id int +#id_str str +#indices list +#name str +#screen_name str + +tweet_hashtag = { + "text": "string" +} + +tweet_url = { + "url": "string", + "expanded_url" : "string", +} + diff -r 54d7f1486ac4 -r 4daf47fcf792 script/lib/iri_tweet/utils.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/iri_tweet/utils.py Tue Jan 18 18:25:18 2011 +0100 @@ -0,0 +1,310 @@ +from models import * +import datetime +import email.utils +import json +import logging +import sys +import twitter +import twitter_text +import os.path +import twitter.oauth + + +def get_oauth_token(token_file_path=None): + + if token_file_path and os.path.file_exists(token_file_path): + logging.debug("reading token from file %s" % token_file_path) + return twitter.oauth.read_token_file(token_file_path) + #read access token info from path + + if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET: + return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET + + return twitter.oauth_dance.oauth_dance(APPLICATION_NAME, CONSUMER_KEY, CONSUMER_SECRET, token_filename) + +def parse_date(date_str): + ts = email.utils.parsedate_tz(date_str) + return datetime.datetime(*ts[0:7]) + + +fields_adapter = { + 'stream': { + "tweet": { + "created_at" : adapt_date, + "coordinates" : adapt_json, + "place" : adapt_json, + "geo" : adapt_json, +# "original_json" : adapt_json, + }, + "user": { + "created_at" : adapt_date, + }, + }, + 'rest': { + "tweet" : { + "place" : adapt_json, + "geo" : adapt_json, + "created_at" : adapt_date, +# "original_json" : adapt_json, + }, + }, +} + +# +# adapt fields, return a copy of the field_dict with adapted fields +# +def adapt_fields(fields_dict, adapter_mapping): + def adapt_one_field(field, value): + if field in adapter_mapping and adapter_mapping[field] is not None: + return adapter_mapping[field](value) + else: + return value + return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) + + + +class TwitterProcessorException(Exception): + pass + +class TwitterProcessor(object): + + def __init__(self, json_dict, json_txt, session): + + if json_dict is None and json_txt is None: + raise TwitterProcessorException("No json") + + if json_dict is None: + self.json_dict = json.loads(json_txt) + else: + self.json_dict = json_dict + + if not json_txt: + self.json_txt = json.dumps(json_dict) + else: + self.json_txt = json_txt + + if "id" not in self.json_dict: + raise TwitterProcessorException("No id in json") + + self.session = session + + def __get_user(self, user_dict): + logging.debug("Get user : " + repr(user_dict)) + + user_id = user_dict.get("id",None) + user_name = user_dict.get("screen_name", user_dict.get("name", None)) + + if user_id is None and user_name is None: + return None + + if user_id: + user = self.session.query(User).filter(User.id == user_id).first() + else: + user = self.session.query(User).filter(User.screen_name == user_name).first() + + if user is not None: + return user + + user_created_at = user_dict.get("created_at", None) + + if user_created_at is None: + acess_token_key, access_token_secret = get_oauth_token() + t = twitter.Twitter(auth=twitter.OAuth(token_key, token_secret, CONSUMER_KEY, CONSUMER_SECRET)) + try: + if user_id: + user_dict = t.users.show(user_id=user_id) + else: + user_dict = t.users.show(screen_name=user_name) + except Exception as e: + logging.info("get_user : TWITTER ERROR : " + repr(e)) + logging.info("get_user : TWITTER ERROR : " + str(e)) + + user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) + if "id" not in user_dict: + return None + + user = User(**user_dict) + + self.session.add(user) + self.session.flush() + + return user + + def __process_entity(self, ind, ind_type): + logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) + + entity_dict = { + "indice_start": ind["indices"][0], + "indice_end" : ind["indices"][1], + "tweet_id" : self.tweet.id, + "tweet" : self.tweet + } + + def process_hashtags(): + text = ind.get("text", ind.get("hashtag", None)) + if text is None: + return None + hashtag = self.session.query(Hashtag).filter(Hashtag.text == text).first() + if not hashtag: + ind["text"] = text + hashtag = Hashtag(**ind) + self.session.add(hashtag) + self.session.flush() + entity_dict['hashtag'] = hashtag + entity_dict['hashtag_id'] = hashtag.id + entity = EntityHashtag(**entity_dict) + return entity + + def process_user_mentions(): + user_mention = self.__get_user(ind) + if user_mention is None: + entity_dict['user'] = None + entity_dict['user_id'] = None + else: + entity_dict['user'] = user_mention + entity_dict['user_id'] = user_mention.id + entity = EntityUser(**entity_dict) + return entity + + def process_urls(): + url = self.session.query(Url).filter(Url.url == ind["url"]).first() + if url is None: + url = Url(**ind) + self.session.add(url) + self.session.flush() + entity_dict['url'] = url + entity_dict['url_id'] = url.id + entity = EntityUrl(**entity_dict) + return entity + + #{'': lambda } + entity = { + 'hashtags': process_hashtags, + 'user_mentions' : process_user_mentions, + 'urls' : process_urls + }[ind_type]() + + logging.debug("Process_entity entity_dict: " + repr(entity_dict)) + if entity: + self.session.add(entity) + self.session.flush() + + + def __process_twitter_stream(self): + + tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() + if tweet_nb > 0: + return + + ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"]) + + # get or create user + user = self.__get_user(self.json_dict["user"]) + if user is None: + log.warning("USER not found " + repr(ts["user"])) + ts_copy["user"] = None + ts_copy["user_id"] = None + else: + ts_copy["user"] = user + ts_copy["user_id"] = ts_copy["user"].id + ts_copy["original_json"] = self.json_txt + + self.tweet = Tweet(**ts_copy) + self.session.add(self.tweet) + self.session.flush() + + # get entities + for ind_type, entity_list in self.json_dict["entities"].items(): + for ind in entity_list: + self.__process_entity(ind, ind_type) + + + def __process_twitter_rest(self): + tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() + if tweet_nb > 0: + return + + tweet_fields = { + 'created_at': self.json_dict["created_at"], + 'favorited': False, + 'id': self.json_dict["id"], + 'id_str': self.json_dict["id_str"], + #'in_reply_to_screen_name': ts["to_user"], + 'in_reply_to_user_id': self.json_dict["to_user_id"], + 'in_reply_to_user_id_str': self.json_dict["to_user_id_str"], + #'place': ts["place"], + 'source': self.json_dict["source"], + 'text': self.json_dict["text"], + 'truncated': False, + 'original_json' : self.json_txt, + } + + #user + + user_fields = { + 'id' : self.json_dict['from_user_id'], + 'id_str' : self.json_dict['from_user_id_str'], + 'lang' : self.json_dict['iso_language_code'], + 'profile_image_url' : self.json_dict["profile_image_url"], + 'screen_name' : self.json_dict["from_user"], + } + + user = self.__get_user(user_fields) + if user is None: + log.warning("USER not found " + repr(user_fields)) + tweet_fields["user"] = None + tweet_fields["user_id"] = None + else: + tweet_fields["user"] = user + tweet_fields["user_id"] = user.id + + tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) + self.tweet = Tweet(**tweet_fields) + session.add(self.tweet) + + text = self.tweet.text + + extractor = twitter_text.Extractor(text) + + for ind in extractor.extract_hashtags_with_indices(): + self.__process_entity(ind, "hashtags") + + for ind in extractor.extract_mentioned_screen_names_with_indices(): + self.__process_entity(ind, "user_mentions") + + for ind in extractor.extract_urls_with_indices(): + self.__process_entity(ind, "urls") + + self.session.flush() + + + def process(self): + if "metadata" in self.json_dict: + self.__process_twitter_rest() + else: + self.__process_twitter_stream() + + +def set_logging(options): + + logging_config = {} + + if options.logfile == "stdout": + logging_config["stream"] = sys.stdout + elif options.logfile == "stderr": + logging_config["stream"] = sys.stderr + else: + logging_config["filename"] = options.logfile + + logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) + logging.basicConfig(**logging_config) + + options.debug = (options.verbose-options.quiet > 0) + +def set_logging_options(parser): + parser.add_option("-l", "--log", dest="logfile", + help="log to file", metavar="LOG", default="stderr") + parser.add_option("-v", dest="verbose", action="count", + help="verbose", metavar="VERBOSE", default=0) + parser.add_option("-q", dest="quiet", action="count", + help="quiet", metavar="QUIET", default=0) diff -r 54d7f1486ac4 -r 4daf47fcf792 script/lib/tweetstream/CHANGELOG --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/tweetstream/CHANGELOG Tue Jan 18 18:25:18 2011 +0100 @@ -0,0 +1,44 @@ +0.1 + + - Initial version + +0.2 + + - Improved error handling + - Added AuthenticationError and ConnectionError exceptions + - Added ReconnectingTweetStream class that supports automatically + reconnecting if the connection is dropped + +0.3 + + - Fixed bugs in authtentication + - Added TrackStream and FollowStream classes + - Added list of endpoint names, and made them legal values for the url arg + +0.3.1 + + - Added lots of tests + - Added proper handling of keepalive newlines + - Improved handling of closing streams + - Added missing anyjson dependency to setup + - Fixed bug where newlines and malformed content were counted as a tweet + +0.3.2 + + - This release was skipped over, due to maintainer brainfart. + +0.3.3 + + - Fixed setup.py so it wont attempt to load modules that aren't installed + yet. Fixes installation issue. + +0.3.4 + + - Updated to latest twitter streaming urls + - Fixed a bug where we tried to call a method on None + +0.3.5 + + - Removed a spurious print statement left over from debugging + - Introduced common base class for all tweetstream exceptions + - Make sure we raise a sensible error on 404. Include url in desc of that error \ No newline at end of file diff -r 54d7f1486ac4 -r 4daf47fcf792 script/lib/tweetstream/LICENSE --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/tweetstream/LICENSE Tue Jan 18 18:25:18 2011 +0100 @@ -0,0 +1,27 @@ +Copyright (c) 2009, Rune Halvorsen +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + +Neither the name of Rune Halvorsen nor the names of its contributors may be +used to endorse or promote products derived from this software without +specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS +BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. diff -r 54d7f1486ac4 -r 4daf47fcf792 script/lib/tweetstream/README --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/tweetstream/README Tue Jan 18 18:25:18 2011 +0100 @@ -0,0 +1,105 @@ +.. -*- restructuredtext -*- + +########################################## +tweetstream - Simple twitter streaming API +########################################## + +Introduction +------------ + +tweetstream provides a class, TweetStream, that can be used to get +tweets from Twitter's streaming API. An instance of the class can be used as +an iterator. In addition to fetching tweets, the object keeps track of +the number of tweets collected and the rate at which tweets are received. + +Subclasses are available for accessing the "track" and "follow" streams +as well. + +There's also a ReconnectingTweetStream class that handles automatic +reconnecting. + +Twitter's documentation about the streaming API can be found here: +http://apiwiki.twitter.com/Streaming-API-Documentation . + +**Note** that the API is blocking. If for some reason data is not immediatly +available, calls will block until enough data is available to yield a tweet. + +Examples +-------- + +Printing all incomming tweets: + +>>> stream = tweetstream.TweetStream("username", "password") +>>> for tweet in stream: +... print tweet + + +The stream object can also be used as a context, as in this example that +prints the author for each tweet as well as the tweet count and rate: + +>>> with tweetstream.TweetStream("username", "password") as stream +... for tweet in stream: +... print "Got tweet from %-16s\t( tweet %d, rate %.1f tweets/sec)" % ( +... tweet["user"]["screen_name"], stream.count, stream.rate ) + + +Stream objects can raise ConnectionError or AuthenticationError exceptions: + +>>> try: +... with tweetstream.TweetStream("username", "password") as stream +... for tweet in stream: +... print "Got tweet from %-16s\t( tweet %d, rate %.1f tweets/sec)" % ( +... tweet["user"]["screen_name"], stream.count, stream.rate ) +... except tweetstream.ConnectionError, e: +... print "Disconnected from twitter. Reason:", e.reason + +To get tweets that relate to specific terms, use the TrackStream: + +>>> words = ["opera", "firefox", "safari"] +>>> with tweetstream.TrackStream("username", "password", words) as stream +... for tweet in stream: +... print "Got interesting tweet:", tweet + +To get only tweets from a set of users, use the FollowStream. The following +would get tweets for user 1, 42 and 8675309 + +>>> users = [1, 42, 8675309] +>>> with tweetstream.FollowStream("username", "password", users) as stream +... for tweet in stream: +... print "Got tweet from:", tweet["user"]["screen_name"] + + +Simple tweet fetcher that sends tweets to an AMQP message server using carrot: + +>>> from carrot.messaging import Publisher +>>> from carrot.connection import AMQPConnection +>>> from tweetstream import TweetStream +>>> amqpconn = AMQPConnection(hostname="localhost", port=5672, +... userid="test", password="test", +... vhost="test") +>>> publisher = Publisher(connection=amqpconn, +... exchange="tweets", routing_key="stream") +>>> with TweetStream("username", "password") as stream: +... for tweet in stream: +... publisher.send(tweet) +>>> publisher.close() + + +Changelog +--------- + +See the CHANGELOG file + +Contact +------- + +The author is Rune Halvorsen . The project resides at +http://bitbucket.org/runeh/tweetstream . If you find bugs, or have feature +requests, please report them in the project site issue tracker. Patches are +also very welcome. + +License +------- + +This software is licensed under the ``New BSD License``. See the ``LICENCE`` +file in the top distribution directory for the full license text. diff -r 54d7f1486ac4 -r 4daf47fcf792 script/lib/tweetstream/servercontext.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/tweetstream/servercontext.py Tue Jan 18 18:25:18 2011 +0100 @@ -0,0 +1,221 @@ +import threading +import contextlib +import time +import os +import socket +import random +from functools import partial +from inspect import isclass +from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler +from SimpleHTTPServer import SimpleHTTPRequestHandler +from SocketServer import BaseRequestHandler + + +class ServerError(Exception): + pass + + +class ServerContext(object): + """Context object with information about a running test server.""" + + def __init__(self, address, port): + self.address = address or "localhost" + self.port = port + + @property + def baseurl(self): + return "http://%s:%s" % (self.address, self.port) + + def __str__(self): + return "" % self.baseurl + + __repr__ = __str__ + + +class _SilentSimpleHTTPRequestHandler(SimpleHTTPRequestHandler): + + def __init__(self, *args, **kwargs): + self.logging = kwargs.get("logging", False) + SimpleHTTPRequestHandler.__init__(self, *args, **kwargs) + + def log_message(self, *args, **kwargs): + if self.logging: + SimpleHTTPRequestHandler.log_message(self, *args, **kwargs) + + +class _TestHandler(BaseHTTPRequestHandler): + """RequestHandler class that handles requests that use a custom handler + callable.""" + + def __init__(self, handler, methods, *args, **kwargs): + self._handler = handler + self._methods = methods + self._response_sent = False + self._headers_sent = False + self.logging = kwargs.get("logging", False) + BaseHTTPRequestHandler.__init__(self, *args, **kwargs) + + def log_message(self, *args, **kwargs): + if self.logging: + BaseHTTPRequestHandler.log_message(self, *args, **kwargs) + + def send_response(self, *args, **kwargs): + self._response_sent = True + BaseHTTPRequestHandler.send_response(self, *args, **kwargs) + + def end_headers(self, *args, **kwargs): + self._headers_sent = True + BaseHTTPRequestHandler.end_headers(self, *args, **kwargs) + + def _do_whatever(self): + """Called in place of do_METHOD""" + data = self._handler(self) + + if hasattr(data, "next"): + # assume it's something supporting generator protocol + self._handle_with_iterator(data) + else: + # Nothing more to do then. + pass + + + def __getattr__(self, name): + if name.startswith("do_") and name[3:].lower() in self._methods: + return self._do_whatever + else: + # fixme instance or class? + raise AttributeError(name) + + def _handle_with_iterator(self, iterator): + self.connection.settimeout(0.1) + for data in iterator: + if not self.server.server_thread.running: + return + + if not self._response_sent: + self.send_response(200) + if not self._headers_sent: + self.end_headers() + + self.wfile.write(data) + # flush immediatly. We may want to do trickling writes + # or something else tha trequires bypassing normal caching + self.wfile.flush() + +class _TestServerThread(threading.Thread): + """Thread class for a running test server""" + + def __init__(self, handler, methods, cwd, port, address): + threading.Thread.__init__(self) + self.startup_finished = threading.Event() + self._methods = methods + self._cwd = cwd + self._orig_cwd = None + self._handler = self._wrap_handler(handler, methods) + self._setup() + self.running = True + self.serverloc = (address, port) + self.error = None + + def _wrap_handler(self, handler, methods): + if isclass(handler) and issubclass(handler, BaseRequestHandler): + return handler # It's OK. user passed in a proper handler + elif callable(handler): + return partial(_TestHandler, handler, methods) + # it's a callable, so wrap in a req handler + else: + raise ServerError("handler must be callable or RequestHandler") + + def _setup(self): + if self._cwd != "./": + self._orig_cwd = os.getcwd() + os.chdir(self._cwd) + + def _init_server(self): + """Hooks up the server socket""" + try: + if self.serverloc[1] == "random": + retries = 10 # try getting an available port max this many times + while True: + try: + self.serverloc = (self.serverloc[0], + random.randint(1025, 49151)) + self._server = HTTPServer(self.serverloc, self._handler) + except socket.error: + retries -= 1 + if not retries: # not able to get a port. + raise + else: + break + else: # use specific port. this might throw, that's expected + self._server = HTTPServer(self.serverloc, self._handler) + except socket.error, e: + self.running = False + self.error = e + # set this here, since we'll never enter the serve loop where + # it is usually set: + self.startup_finished.set() + return + + self._server.allow_reuse_address = True # lots of tests, same port + self._server.timeout = 0.1 + self._server.server_thread = self + + + def run(self): + self._init_server() + + while self.running: + self._server.handle_request() # blocks for self.timeout secs + # First time this falls through, signal the parent thread that + # the server is ready for incomming connections + if not self.startup_finished.is_set(): + self.startup_finished.set() + + self._cleanup() + + def stop(self): + """Stop the server and attempt to make the thread terminate. + This happens async but the calling code can check periodically + the isRunning flag on the thread object. + """ + # actual stopping happens in the run method + self.running = False + + def _cleanup(self): + """Do some rudimentary cleanup.""" + if self._orig_cwd: + os.chdir(self._orig_cwd) + + +@contextlib.contextmanager +def test_server(handler=_SilentSimpleHTTPRequestHandler, port=8514, + address="", methods=("get", "head"), cwd="./"): + """Context that makes available a web server in a separate thread""" + thread = _TestServerThread(handler=handler, methods=methods, cwd=cwd, + port=port, address=address) + thread.start() + + # fixme: should this be daemonized? If it isn't it will block the entire + # app, but that should never happen anyway.. + thread.startup_finished.wait() + + if thread.error: # startup failed! Bail, throw whatever the server did + raise thread.error + + exc = None + try: + yield ServerContext(*thread.serverloc) + except Exception, exc: + pass + thread.stop() + thread.join(5) # giving it a lot of leeway. should never happen + + if exc: + raise exc + + # fixme: this takes second priorty after the internal exception but would + # still be nice to signal back to calling code. + + if thread.isAlive(): + raise Warning("Test server could not be stopped") diff -r 54d7f1486ac4 -r 4daf47fcf792 script/lib/tweetstream/setup.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/tweetstream/setup.py Tue Jan 18 18:25:18 2011 +0100 @@ -0,0 +1,27 @@ +from setuptools import setup, find_packages +import sys, os + +author = "Rune Halvorsen" +email = "runefh@gmail.com" +version = "0.3.5" +homepage = "http://bitbucket.org/runeh/tweetstream/" + +setup(name='tweetstream', + version=version, + description="Simple Twitter streaming API access", + long_description=open("README").read(), + classifiers=[ + 'License :: OSI Approved :: BSD License', + 'Intended Audience :: Developers', + ], + keywords='twitter', + author=author, + author_email=email, + url=homepage, + license='BSD', + packages=find_packages(exclude=['ez_setup', 'examples', 'tests']), + include_package_data=True, + zip_safe=False, + platforms=["any"], + install_requires = ['anyjson'], +) diff -r 54d7f1486ac4 -r 4daf47fcf792 script/lib/tweetstream/tests/test_tweetstream.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/tweetstream/tests/test_tweetstream.py Tue Jan 18 18:25:18 2011 +0100 @@ -0,0 +1,193 @@ +import contextlib +import threading +import time +from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer + +from nose.tools import assert_raises +from tweetstream import TweetStream, FollowStream, TrackStream +from tweetstream import ConnectionError, AuthenticationError + +from servercontext import test_server + +single_tweet = r"""{"in_reply_to_status_id":null,"in_reply_to_user_id":null,"favorited":false,"created_at":"Tue Jun 16 10:40:14 +0000 2009","in_reply_to_screen_name":null,"text":"record industry just keeps on amazing me: http:\/\/is.gd\/13lFo - $150k per song you've SHARED, not that somebody has actually DOWNLOADED.","user":{"notifications":null,"profile_background_tile":false,"followers_count":206,"time_zone":"Copenhagen","utc_offset":3600,"friends_count":191,"profile_background_color":"ffffff","profile_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_images\/250715794\/profile_normal.png","description":"Digital product developer, currently at Opera Software. My tweets are my opinions, not those of my employer.","verified_profile":false,"protected":false,"favourites_count":0,"profile_text_color":"3C3940","screen_name":"eiriksnilsen","name":"Eirik Stridsklev N.","following":null,"created_at":"Tue May 06 12:24:12 +0000 2008","profile_background_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_background_images\/10531192\/160x600opera15.gif","profile_link_color":"0099B9","profile_sidebar_fill_color":"95E8EC","url":"http:\/\/www.stridsklev-nilsen.no\/eirik","id":14672543,"statuses_count":506,"profile_sidebar_border_color":"5ED4DC","location":"Oslo, Norway"},"id":2190767504,"truncated":false,"source":"Twitter Opera widget<\/a>"}""" + + +def test_bad_auth(): + """Test that the proper exception is raised when the user could not be + authenticated""" + def auth_denied(request): + request.send_error(401) + + with test_server(handler=auth_denied, methods=("post", "get"), + port="random") as server: + stream = TweetStream("foo", "bar", url=server.baseurl) + assert_raises(AuthenticationError, stream.next) + + stream = FollowStream("foo", "bar", [1, 2, 3], url=server.baseurl) + assert_raises(AuthenticationError, stream.next) + + stream = TrackStream("foo", "bar", ["opera"], url=server.baseurl) + assert_raises(AuthenticationError, stream.next) + + +def test_404_url(): + """Test that the proper exception is raised when the stream URL can't be + found""" + def not_found(request): + request.send_error(404) + + with test_server(handler=not_found, methods=("post", "get"), + port="random") as server: + stream = TweetStream("foo", "bar", url=server.baseurl) + assert_raises(ConnectionError, stream.next) + + stream = FollowStream("foo", "bar", [1, 2, 3], url=server.baseurl) + assert_raises(ConnectionError, stream.next) + + stream = TrackStream("foo", "bar", ["opera"], url=server.baseurl) + assert_raises(ConnectionError, stream.next) + + +def test_bad_content(): + """Test error handling if we are given invalid data""" + def bad_content(request): + for n in xrange(10): + # what json we pass doesn't matter. It's not verifying the + # strcuture, only checking that it's parsable + yield "[1,2,3]" + yield "[1,2, I need no stinking close brace" + yield "[1,2,3]" + + def do_test(klass, *args): + with test_server(handler=bad_content, methods=("post", "get"), + port="random") as server: + stream = klass("foo", "bar", *args, url=server.baseurl) + for tweet in stream: + pass + + assert_raises(ConnectionError, do_test, TweetStream) + assert_raises(ConnectionError, do_test, FollowStream, [1, 2, 3]) + assert_raises(ConnectionError, do_test, TrackStream, ["opera"]) + + +def test_closed_connection(): + """Test error handling if server unexpectedly closes connection""" + cnt = 1000 + def bad_content(request): + for n in xrange(cnt): + # what json we pass doesn't matter. It's not verifying the + # strcuture, only checking that it's parsable + yield "[1,2,3]" + + def do_test(klass, *args): + with test_server(handler=bad_content, methods=("post", "get"), + port="random") as server: + stream = klass("foo", "bar", *args, url=server.baseurl) + for tweet in stream: + pass + + assert_raises(ConnectionError, do_test, TweetStream) + assert_raises(ConnectionError, do_test, FollowStream, [1, 2, 3]) + assert_raises(ConnectionError, do_test, TrackStream, ["opera"]) + + +def test_bad_host(): + """Test behaviour if we can't connect to the host""" + stream = TweetStream("foo", "bar", url="http://bad.egewdvsdswefdsf.com/") + assert_raises(ConnectionError, stream.next) + + stream = FollowStream("foo", "bar", [1, 2, 3], url="http://zegwefdsf.com/") + assert_raises(ConnectionError, stream.next) + + stream = TrackStream("foo", "bar", ["foo"], url="http://aswefdsews.com/") + assert_raises(ConnectionError, stream.next) + + +def smoke_test_receive_tweets(): + """Receive 100k tweets and disconnect (slow)""" + total = 100000 + + def tweetsource(request): + while True: + yield single_tweet + "\n" + + def do_test(klass, *args): + with test_server(handler=tweetsource, + methods=("post", "get"), port="random") as server: + stream = klass("foo", "bar", *args, url=server.baseurl) + for tweet in stream: + if stream.count == total: + break + + do_test(TweetStream) + do_test(FollowStream, [1, 2, 3]) + do_test(TrackStream, ["foo", "bar"]) + + +def test_keepalive(): + """Make sure we behave sanely when there are keepalive newlines in the + data recevived from twitter""" + def tweetsource(request): + yield single_tweet+"\n" + yield "\n" + yield "\n" + yield single_tweet+"\n" + yield "\n" + yield "\n" + yield "\n" + yield "\n" + yield "\n" + yield "\n" + yield "\n" + yield single_tweet+"\n" + yield "\n" + + def do_test(klass, *args): + with test_server(handler=tweetsource, methods=("post", "get"), + port="random") as server: + stream = klass("foo", "bar", *args, url=server.baseurl) + try: + for tweet in stream: + pass + except ConnectionError: + assert stream.count == 3, "Got %s, wanted 3" % stream.count + else: + assert False, "Didn't handle keepalive" + + + do_test(TweetStream) + do_test(FollowStream, [1, 2, 3]) + do_test(TrackStream, ["foo", "bar"]) + + +def test_buffering(): + """Test if buffering stops data from being returned immediately. + If there is some buffering in play that might mean data is only returned + from the generator when the buffer is full. If buffer is bigger than a + tweet, this will happen. Default buffer size in the part of socket lib + that enables readline is 8k. Max tweet length is around 3k.""" + + def tweetsource(request): + yield single_tweet+"\n" + time.sleep(2) + # need to yield a bunch here so we're sure we'll return from the + # blocking call in case the buffering bug is present. + for n in xrange(100): + yield single_tweet+"\n" + + def do_test(klass, *args): + with test_server(handler=tweetsource, methods=("post", "get"), + port="random") as server: + stream = klass("foo", "bar", *args, url=server.baseurl) + + start = time.time() + stream.next() + first = time.time() + diff = first - start + assert diff < 1, "Getting first tweet took more than a second!" + + do_test(TweetStream) + do_test(FollowStream, [1, 2, 3]) + do_test(TrackStream, ["foo", "bar"]) + + diff -r 54d7f1486ac4 -r 4daf47fcf792 script/lib/tweetstream/tweetstream/__init__.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/tweetstream/tweetstream/__init__.py Tue Jan 18 18:25:18 2011 +0100 @@ -0,0 +1,297 @@ +""" +Simple Twitter streaming API access +""" +__version__ = "0.3.5" +__author__ = "Rune Halvorsen " +__homepage__ = "http://bitbucket.org/runeh/tweetstream/" +__docformat__ = "restructuredtext" + +import urllib +import urllib2 +import socket +import time +import anyjson + + +""" + .. data:: URLS + + Mapping between twitter endpoint names and URLs. + + .. data:: USER_AGENT + + The default user agent string for stream objects + +""" + +URLS = {"firehose": "http://stream.twitter.com/1/statuses/firehose.json", + "sample": "http://stream.twitter.com/1/statuses/sample.json", + "follow": "http://stream.twitter.com/1/statuses/filter.json", + "track": "http://stream.twitter.com/1/statuses/filter.json"} + +USER_AGENT = "TweetStream %s" % __version__ + + +class TweetStreamError(Exception): + """Base class for all tweetstream errors""" + pass + +class AuthenticationError(TweetStreamError): + """Exception raised if the username/password is not accepted + """ + pass + + +class ConnectionError(TweetStreamError): + """Raised when there are network problems. This means when there are + dns errors, network errors, twitter issues""" + + def __init__(self, reason, details=None): + self.reason = reason + self.details = details + + def __str__(self): + return '' % self.reason + + +class TweetStream(object): + """A network connection to Twitters streaming API + + :param username: Twitter username for the account accessing the API. + :param password: Twitter password for the account accessing the API. + + :keyword url: URL to connect to. This can be either an endopoint name, + such as "sample", or a full URL. By default, the public "sample" url + is used. All known endpoints are defined in the :URLS: attribute + + .. attribute:: connected + + True if the object is currently connected to the stream. + + .. attribute:: url + + The URL to which the object is connected + + .. attribute:: starttime + + The timestamp, in seconds since the epoch, the object connected to the + streaming api. + + .. attribute:: count + + The number of tweets that have been returned by the object. + + .. attribute:: rate + + The rate at which tweets have been returned from the object as a + float. see also :attr: `rate_period`. + + .. attribute:: rate_period + + The ammount of time to sample tweets to calculate tweet rate. By + default 10 seconds. Changes to this attribute will not be reflected + until the next time the rate is calculated. The rate of tweets vary + with time of day etc. so it's usefull to set this to something + sensible. + + .. attribute:: user_agent + + User agent string that will be included in the request. NOTE: This can + not be changed after the connection has been made. This property must + thus be set before accessing the iterator. The default is set in + :attr: `USER_AGENT`. +""" + + def __init__(self, username, password, url="sample"): + self._conn = None + self._rate_ts = None + self._rate_cnt = 0 + self._username = username + self._password = password + + self.rate_period = 10 # in seconds + self.connected = False + self.starttime = None + self.count = 0 + self.rate = 0 + self.user_agent = USER_AGENT + self.url = URLS.get(url, url) + + def __iter__(self): + return self + + def __enter__(self): + return self + + def __exit__(self, *params): + self.close() + return False + + def _init_conn(self): + """Open the connection to the twitter server""" + headers = {'User-Agent': self.user_agent} + req = urllib2.Request(self.url, self._get_post_data(), headers) + + password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() + password_mgr.add_password(None, self.url, self._username, + self._password) + handler = urllib2.HTTPBasicAuthHandler(password_mgr) + opener = urllib2.build_opener(handler) + + try: + self._conn = opener.open(req) + except urllib2.HTTPError, exception: + if exception.code == 401: + raise AuthenticationError("Access denied") + elif exception.code == 404: + raise ConnectionError("URL not found: %s" % self.url) + else: # re raise. No idea what would cause this, so want to know + raise + except urllib2.URLError, exception: + raise ConnectionError(exception.reason) + + self.connected = True + if not self.starttime: + self.starttime = time.time() + if not self._rate_ts: + self._rate_ts = time.time() + + def _get_post_data(self): + """Subclasses that need to add post data to the request can override + this method and return post data. The data should be in the format + returned by urllib.urlencode.""" + return None + + def next(self): + """Return the next available tweet. This call is blocking!""" + while True: + try: + if not self.connected: + self._init_conn() + + rate_time = time.time() - self._rate_ts + if not self._rate_ts or rate_time > self.rate_period: + self.rate = self._rate_cnt / rate_time + self._rate_cnt = 0 + self._rate_ts = time.time() + + data = self._conn.readline() + if data == "": # something is wrong + self.close() + raise ConnectionError("Got entry of length 0. Disconnected") + elif data.isspace(): + continue + + data = anyjson.deserialize(data) + self.count += 1 + self._rate_cnt += 1 + return data + + except ValueError, e: + self.close() + raise ConnectionError("Got invalid data from twitter", details=data) + + except socket.error, e: + self.close() + raise ConnectionError("Server disconnected") + + + def close(self): + """ + Close the connection to the streaming server. + """ + self.connected = False + if self._conn: + self._conn.close() + + +class ReconnectingTweetStream(TweetStream): + """TweetStream class that automatically tries to reconnect if the + connecting goes down. Reconnecting, and waiting for reconnecting, is + blocking. + + :param username: See :TweetStream: + + :param password: See :TweetStream: + + :keyword url: See :TweetStream: + + :keyword reconnects: Number of reconnects before a ConnectionError is + raised. Default is 3 + + :error_cb: Optional callable that will be called just before trying to + reconnect. The callback will be called with a single argument, the + exception that caused the reconnect attempt. Default is None + + :retry_wait: Time to wait before reconnecting in seconds. Default is 5 + + """ + + def __init__(self, username, password, url="sample", + reconnects=3, error_cb=None, retry_wait=5): + self.max_reconnects = reconnects + self.retry_wait = retry_wait + self._reconnects = 0 + self._error_cb = error_cb + TweetStream.__init__(self, username, password, url=url) + + def next(self): + while True: + try: + return TweetStream.next(self) + except ConnectionError, e: + self._reconnects += 1 + if self._reconnects > self.max_reconnects: + raise ConnectionError("Too many retries") + + # Note: error_cb is not called on the last error since we + # raise a ConnectionError instead + if callable(self._error_cb): + self._error_cb(e) + + time.sleep(self.retry_wait) + # Don't listen to auth error, since we can't reasonably reconnect + # when we get one. + +class FollowStream(TweetStream): + """Stream class for getting tweets from followers. + + :param user: See TweetStream + + :param password: See TweetStream + + :param followees: Iterable containing user IDs to follow. + ***Note:*** the user id in question is the numeric ID twitter uses, + not the normal username. + + :keyword url: Like the url argument to TweetStream, except default + value is the "follow" endpoint. + """ + + def __init__(self, user, password, followees, url="follow", **kwargs): + self.followees = followees + TweetStream.__init__(self, user, password, url=url, **kwargs) + + def _get_post_data(self): + return urllib.urlencode({"follow": ",".join(map(str, self.followees))}) + + +class TrackStream(TweetStream): + """Stream class for getting tweets relevant to keywords. + + :param user: See TweetStream + + :param password: See TweetStream + + :param keywords: Iterable containing keywords to look for + + :keyword url: Like the url argument to TweetStream, except default + value is the "track" endpoint. + """ + + def __init__(self, user, password, keywords, url="track", **kwargs): + self.keywords = keywords + TweetStream.__init__(self, user, password, url=url, **kwargs) + + def _get_post_data(self): + return urllib.urlencode({"track": ",".join(self.keywords)}) diff -r 54d7f1486ac4 -r 4daf47fcf792 script/virtualenv/res/anyjson.tar.gz Binary file script/virtualenv/res/anyjson.tar.gz has changed diff -r 54d7f1486ac4 -r 4daf47fcf792 script/virtualenv/res/tweetstream.tar.gz Binary file script/virtualenv/res/tweetstream.tar.gz has changed