--- a/.settings/org.eclipse.core.resources.prefs Tue Jan 18 10:08:03 2011 +0100
+++ b/.settings/org.eclipse.core.resources.prefs Tue Jan 18 18:25:18 2011 +0100
@@ -1,4 +1,4 @@
-#Fri Jan 07 10:05:33 CET 2011
+#Tue Jan 18 10:08:53 CET 2011
eclipse.preferences.version=1
-encoding//script/iri_tweet/export_twitter_alchemy.py=utf-8
+encoding//script/lib/iri_tweet/export_twitter_alchemy.py=utf-8
encoding//script/rest/export_twitter.py=utf-8
--- a/script/iri_tweet/create_twitter_export_conf.py Tue Jan 18 10:08:03 2011 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,43 +0,0 @@
-from lxml import etree
-from optparse import OptionParser
-
-def get_options():
-
- parser = OptionParser()
-
- parser.add_option("-f", "--file", dest="outputfile",
- help="destination filename", metavar="FILE", default="twitter_export_conf.xml")
- parser.add_option("-i", "--input", dest="inputfile",
- help="inputfile", metavar="INPUT", default=None)
-
- return parser.parse_args()
-
-if __name__ == "__main__":
- (options, args) = get_options()
-
- dest_filename = options.outputfile
-
- path_list = []
- if options.inputfile is None:
- path_list = args
- else:
- with open(options.inputfile, 'r') as fi:
- path_list = fi
-
-
- root = etree.Element("twitter_export")
-
-
- for path in path_list:
-
- iri_doc = etree.parse(path)
- media_nodes = iri_doc.xpath("/iri/body/medias/media[@id='video']/video")
- duration = int(media_nodes[0].get("dur"))/1000
-
- file_elem = etree.SubElement(root, "file")
- etree.SubElement(file_elem, "path").text = path
- etree.SubElement(file_elem, "start_date")
- etree.SubElement(file_elem, "duration").text = unicode(duration)
-
- tree = etree.ElementTree(root)
- tree.write(dest_filename, encoding="utf-8", pretty_print=True, xml_declaration=True)
\ No newline at end of file
--- a/script/iri_tweet/export_tweet_db.py Tue Jan 18 10:08:03 2011 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,47 +0,0 @@
-from models import *
-from utils import *
-from optparse import OptionParser
-from sqlalchemy.orm import sessionmaker
-import logging
-import sqlite3
-import sys
-
-
-# 'entities': "tweet_entity",
-# 'user': "tweet_user"
-
-def get_option():
-
- parser = OptionParser()
-
- set_logging_options(parser)
-
- return parser.parse_args()
-
-if __name__ == "__main__":
-
- (options, args) = get_option()
-
- set_logging(options)
-
- with sqlite3.connect(args[0]) as conn_in:
- engine, metadata = setup_database('sqlite:///'+args[1], echo=((options.verbose-options.quiet)>0))
- Session = sessionmaker(bind=engine)
- session = Session()
- try:
- curs_in = conn_in.cursor()
- fields_mapping = {}
- for i,res in enumerate(curs_in.execute("select json from tweet_tweet;")):
- logging.debug("main loop %d : %s" % (i, res[0]))
- processor = TwitterProcessor(eval(res[0]), res[0], session)
- processor.process()
- session.commit()
- logging.debug("main : %d tweet processed" % (i+1))
- except Exception, e:
- session.rollback()
- raise e
- finally:
- session.close()
-
-
-
\ No newline at end of file
--- a/script/iri_tweet/export_twitter_alchemy.py Tue Jan 18 10:08:03 2011 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,216 +0,0 @@
-#!/usr/bin/env python
-# coding=utf-8
-
-from lxml import etree
-from models import *
-from optparse import OptionParser
-from sqlalchemy import Table, Column, Integer, BigInteger, String, MetaData, \
- ForeignKey
-from sqlalchemy.orm import sessionmaker, mapper
-from sqlalchemy.sql import select
-import datetime
-import email.utils
-import logging
-import os
-import os.path
-import re
-import sys
-import time
-import uuid
-
-#class TweetExclude(object):
-# def __init__(self, id):
-# self.id = id
-#
-# def __repr__(self):
-# return "<TweetExclude(id=%d)>" % (self.id)
-
-def parse_date(date_str):
- ts = email.utils.parsedate_tz(date_str)
- return datetime.datetime(*ts[0:7])
-
-def get_options():
- parser = OptionParser()
- parser.add_option("-f", "--file", dest="filename",
- help="write export to file", metavar="FILE", default="project_enmi.ldt")
- parser.add_option("-d", "--database", dest="database",
- help="Input database", metavar="DATABASE")
- parser.add_option("-s", "--start-date", dest="start_date",
- help="start date", metavar="START_DATE")
- parser.add_option("-e", "--end-date", dest="end_date",
- help="end date", metavar="END_DATE")
- parser.add_option("-I", "--content-file", dest="content_file",
- help="Content file", metavar="CONTENT_FILE")
- parser.add_option("-c", "--content", dest="content",
- help="Content url", metavar="CONTENT")
- parser.add_option("-V", "--video-url", dest="video",
- help="video url", metavar="VIDEO")
- parser.add_option("-i", "--content-id", dest="content_id",
- help="Content id", metavar="CONTENT_ID")
- parser.add_option("-x", "--exclude", dest="exclude",
- help="file containing the id to exclude", metavar="EXCLUDE")
- parser.add_option("-C", "--color", dest="color",
- help="Color code", metavar="COLOR", default="16763904")
- parser.add_option("-H", "--hashtag", dest="hashtag",
- help="Hashtag", metavar="HASHTAG", default="enmi")
- parser.add_option("-D", "--duration", dest="duration", type="int",
- help="Duration", metavar="DURATION", default=None)
- parser.add_option("-n", "--name", dest="name",
- help="Cutting name", metavar="NAME", default=u"Tweets")
- parser.add_option("-R", "--replace", dest="replace", action="store_true",
- help="Replace tweet ensemble", metavar="REPLACE", default=False)
- parser.add_option("-l", "--log", dest="logfile",
- help="log to file", metavar="LOG", default="stderr")
-
- set_logging_options(parser)
-
-
- return parser.parse_args()
-
-
-if __name__ == "__main__" :
-
- (options, args) = get_options()
-
- set_logging(options)
-
- logging.debug("OPTIONS : " + repr(options))
-
- engine, metadata = setup_database('sqlite:///'+options.database, echo=((options.verbose-options.quiet)>0), create_all = False)
-
- Session = sessionmaker()
- conn = engine.connect()
- try :
- session = Session(bind=conn)
- try :
-
- metadata = MetaData(bind=conn)
- tweet_exclude_table = Table("tweet_exclude", metadata, Column('id', BigInteger, primary_key=True), prefixes=['TEMPORARY'])
- #mapper(TweetExclude, tweet_exclude_table)
- metadata.create_all()
-
- if options.exclude and os.path.exists(options.exclude):
- with open(options.exclude, 'r+') as f:
- tei = tweet_exclude_table.insert()
- for line in f:
- conn.execute(tei.values(id=long(line.strip())))
-
- if options.listconf:
-
- parameters = []
- confdoc = etree.parse(options.listconf)
- for node in confdoc.xpath("/twitter_export/file"):
- params = {}
- for snode in node:
- if snode.tag == "path":
- params['content_file'] = snode.text
- elif snode.tag == "start_date":
- params['start_date'] = snode.text
- elif snode.tag == "end_date":
- params['end_date'] = snode.text
- elif snode.tag == "duration":
- params['duration'] = int(snode.text)
- parameters.append(params)
- else:
- parameters = [{
- 'start_date': options.start_date,
- 'end_date' : options.end_date,
- 'duration' : options.duration,
- 'content_file' : otions.content_file
-
- }]
-
- for params in parameters:
-
- logging.debug("PARAMETERS " + repr(params))
-
- start_date_str = params.get("start_date",None)
- end_date_str = params.get("end_date", None)
- duration = params.get("duration", None)
- content_file = params.get("content_file", None)
-
-
- start_date = parse_date(start_date_str)
- ts = time.mktime(start_date.timetuple())
-
- if end_date_str:
- end_date = parse_date(end_date_str)
- te = time.mktime(end_date.timetuple())
- else:
- te = ts + duration
- end_date = start_date + datetime.timedelta(seconds=duration)
-
-
- query_res = session.query(Tweet).join(EntityHashtag).join(Hashtag).filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))).filter(Hashtag.text.contains(options.hashtag)).filter(Tweet.created_at >= start_date).filter(Tweet.created_at <= end_date).all()
-
- #hashtag = u"%#"+unicode(options.hashtag)+u"%"
-
- #cursor.execute("select tt.id, tt.text, tt.created_at_ts, tu.name, tu.screen_name from tweet_tweet as tt join tweet_user as tu on tt.user = tu.rowid where text like ? and tt.created_at_ts >= ? and tt.created_at_ts <= ? and tt.id not in (select id from tweet_exclude) order by tt.created_at_ts asc;", (hashtag,ts,te));
-
- root = None
- ensemble_parent = None
-
- if content_file and os.path.exists(content_file):
-
- doc = etree.parse(content_file)
- root = doc.getroot()
-
- ensemble_parent = root.xpath("//ensembles")[0]
-
- else:
- root = etree.Element(u"iri")
-
- project = etree.SubElement(root, u"project", {u"abstract":u"Twitter comments on ENMI",u"title":u"Twitter comments on ENMI 2009", u"user":u"IRI Web", u"id":unicode(uuid.uuid4())})
-
- medias = etree.SubElement(root, u"medias")
- media = etree.SubElement(medias, u"media", {u"pict":u"", u"src":unicode(options.content), u"video":unicode(options.video), u"id":unicode(options.content_id), u"extra":u""})
-
- annotations = etree.SubElement(root, u"annotations")
- content = etree.SubElement(annotations, u"content", {u"id":unicode(options.content_id)})
- ensemble_parent = content
-
- if options.replace:
- for ens in ensemble_parent.iterchildren(tag=u"ensemble"):
- if ens.get("id","").startswith("tweet_"):
- ensemble_parent.remove(ens)
-
- ensemble = etree.SubElement(ensemble_parent, u"ensemble", {u"id":u"tweet_" + unicode(uuid.uuid4()), u"title":u"Ensemble Twitter", u"author":u"IRI Web", u"abstract":u"Ensemble Twitter pour ENMI 2009"})
- decoupage = etree.SubElement(ensemble, u"decoupage", {u"id": unicode(uuid.uuid4()), u"author": u"IRI Web"})
-
- etree.SubElement(decoupage, u"title").text = unicode(options.name)
- etree.SubElement(decoupage, u"abstract").text = unicode(options.name)
-
- elements = etree.SubElement(decoupage, u"elements")
-
- for tw in query_res:
- tweet_ts_dt = tw.created_at
- tweet_ts = int(time.mktime(tweet_ts_dt.timetuple()))
- tweet_ts_rel = (tweet_ts-ts) * 1000
- username = None
- if tw.user is not None:
- username = tw.user.name
- if not username:
- username = "anon."
- element = etree.SubElement(elements, u"element" , {u"id":unicode(uuid.uuid4())+u"-"+unicode(tw.id), u"color":unicode(options.color), u"author":unicode(username), u"date":unicode(tweet_ts_dt.strftime("%Y/%m/%d")), u"begin": unicode(tweet_ts_rel), u"dur":u"0", u"src":u""})
- etree.SubElement(element, u"title").text = unicode(username) + u": " + unicode(tw.text)
- etree.SubElement(element, u"abstract").text = unicode(tw.text)
-
- tags_node = etree.SubElement(element, u"tags")
-
- for entity in tw.entity_list:
- if entity.type == u'entity_hashtag':
- etree.SubElement(tags_node,u"tag").text = entity.hashtag.text
-
- if content_file and os.path.exists(content_file):
- output = open(content_file, "w")
- else:
- output = open(options.filename, "w")
-
- output.write(etree.tostring(root, encoding="utf-8", method="xml", pretty_print=True, xml_declaration=True))
- output.flush()
- output.close()
-
- finally:
- session.close()
- finally:
- conn.close()
--- a/script/iri_tweet/models.py Tue Jan 18 10:08:03 2011 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,314 +0,0 @@
-from sqlalchemy import Boolean, Table, Column, BigInteger, Integer, String, \
- MetaData, ForeignKey, DateTime, create_engine
-from sqlalchemy.ext.declarative import declarative_base
-from sqlalchemy.orm import relationship, backref
-import datetime
-import email.utils
-import simplejson
-
-
-Base = declarative_base()
-
-APPLICATION_NAME = "IRI_TWITTER"
-CONSUMER_KEY = "54ThDZhpEjokcMgHJOMnQA"
-CONSUMER_SECRET = "wUoL9UL2T87tfc97R0Dff2EaqRzpJ5XGdmaN2XK3udA"
-#ACCESS_TOKEN_KEY= "47312923-LiNTtz0I18YXMVIrFeTuhmH7bOvYsK6p3Ln2Dc"
-#ACCESS_TOKEN_SECRET = "r3LoXVcjImNAElUpWqTu2SG2xCdWFHkva7xeQoncA"
-
-def adapt_date(date_str):
- ts = email.utils.parsedate_tz(date_str)
- return datetime.datetime(*ts[0:7])
-
-def adapt_json(obj):
- if obj is None:
- return None
- else:
- return simplejson.dumps(obj)
-
-class Entity(Base):
- __tablename__ = "tweet_entity"
- id = Column(Integer, primary_key = True)
- tweet_id = Column(BigInteger, ForeignKey('tweet_tweet.id'))
- #tweet = relationship(Tweet, primaryjoin = tweet_id == Tweet.id)
- type = Column(String)
- indice_start = Column(Integer)
- indice_end = Column(Integer)
- __mapper_args__ = {'polymorphic_on': type}
-
- def __init__(self, **kwargs):
- for key, value in kwargs.items():
- if hasattr(self,key):
- setattr(self,key,value)
-
-
-class Tweet(Base):
- __tablename__ = 'tweet_tweet'
-
- id = Column(BigInteger, primary_key=True, autoincrement=False)
- id_str = Column(String)
- contributors = Column(String)
- coordinates = Column(String)
- created_at = Column(DateTime)
- favorited = Column(Boolean)
- geo = Column(String)
- in_reply_to_screen_name = Column(String)
- in_reply_to_status_id = Column(BigInteger)
- in_reply_to_status_id_str = Column(String)
- in_reply_to_user_id = Column(Integer)
- in_reply_to_user_id_str = Column(String)
- place = Column(String)
- retweet_count = Column(Integer)
- retweeted = Column(Boolean)
- source = Column(String)
- text = Column(String)
- truncated = Column(Boolean)
- user_id = Column(Integer, ForeignKey('tweet_user.id'))
- original_json = Column(String)
- entity_list = relationship(Entity, backref='tweet')
-
- #user = relationship(User, primaryjoin=user_id == User.id)
-
- def __init__(self, **kwargs):
- for key, value in kwargs.items():
- if hasattr(self,key):
- setattr(self,key,value)
-
-
-class User(Base):
- __tablename__ = "tweet_user"
-
- id = Column(Integer, primary_key = True, autoincrement=False)
- id_str= Column(String)
- contributors_enabled= Column(Boolean)
- created_at= Column(DateTime)
- description= Column(String)
- favourites_count = Column(Integer)
- follow_request_sent = Column(Boolean)
- followers_count = Column(Integer)
- following = Column(String)
- friends_count = Column(Integer)
- geo_enabled= Column(Boolean)
- is_translator= Column(Boolean)
- lang = Column(String)
- listed_count = Column(Integer)
- location= Column(String)
- name = Column(String)
- notifications = Column(String)
- profile_background_color= Column(String)
- profile_background_image_url= Column(String)
- profile_background_tile= Column(Boolean)
- profile_image_url= Column(String)
- profile_link_color= Column(String)
- profile_sidebar_border_color= Column(String)
- profile_sidebar_fill_color= Column(String)
- profile_text_color= Column(String)
- profile_use_background_image= Column(Boolean)
- protected= Column(Boolean)
- screen_name= Column(String)
- show_all_inline_media= Column(Boolean)
- statuses_count = Column(Integer)
- time_zone= Column(String)
- url= Column(String)
- utc_offset = Column(Integer)
- verified= Column(Boolean)
- tweets = relationship(Tweet, backref='user')
-
- def __init__(self, **kwargs):
- for key, value in kwargs.items():
- if hasattr(self,key):
- setattr(self,key,value)
-
-
-
-class Hashtag(Base):
- __tablename__ = "tweet_hashtag"
- id = Column(Integer, primary_key=True)
- text = Column(String, unique = True)
- def __init__(self, **kwargs):
- for key, value in kwargs.items():
- if hasattr(self,key):
- setattr(self,key,value)
-
-
-
-class Url(Base):
- __tablename__ = "tweet_url"
- id = Column(Integer, primary_key=True)
- url = Column(String, unique=True)
- expanded_url = Column(String)
- def __init__(self, **kwargs):
- for key, value in kwargs.items():
- if hasattr(self,key):
- setattr(self,key,value)
-
-
-
-class EntityHashtag(Entity):
- __tablename__ = "tweet_entity_hashtag"
- __mapper_args__ = {'polymorphic_identity': 'entity_hashtag'}
- id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
- hashtag_id = Column(Integer, ForeignKey("tweet_hashtag.id"))
- hashtag = relationship(Hashtag, primaryjoin=hashtag_id == Hashtag.id)
- def __init__(self, **kwargs):
- super(EntityHashtag, self).__init__(**kwargs)
- for key, value in kwargs.items():
- if hasattr(self,key):
- setattr(self,key,value)
-
-
-class EntityUrl(Entity):
- __tablename__ = "tweet_entity_url"
- __mapper_args__ = {'polymorphic_identity': 'entity_url'}
- id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
- url_id = Column(Integer, ForeignKey("tweet_url.id"))
- url = relationship(Url, primaryjoin=url_id == Url.id)
- def __init__(self, **kwargs):
- super(EntityUrl, self).__init__(**kwargs)
- for key, value in kwargs.items():
- if hasattr(self,key):
- setattr(self,key,value)
-
-class EntityUser(Entity):
- __tablename__ = "tweet_entity_user"
- __mapper_args__ = {'polymorphic_identity': 'entity_user'}
- id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
- user_id = Column(Integer, ForeignKey('tweet_user.id'))
- user = relationship(User, primaryjoin=user_id == User.id)
-
- def __init__(self, **kwargs):
- super(EntityUser, self).__init__(**kwargs)
- for key, value in kwargs.items():
- if hasattr(self,key):
- setattr(self,key,value)
-
-
-def setup_database(*args, **kwargs):
-
- create_all = True
- if "create_all" in kwargs:
- create_all = kwargs["create_all"]
- del(kwargs["create_all"])
-
- engine = create_engine(*args, **kwargs)
- metadata = Base.metadata
-
- if create_all:
- metadata.create_all(engine)
-
- return (engine, metadata)
-
-rest_tweet_tweet = {
- u'iso_language_code': 'unicode',
- u'text': 'unicode',
- u'from_user_id_str': 'unicode',
- u'profile_image_url': 'unicode',
- u'to_user_id_str': 'NoneType',
- u'created_at': 'unicode',
- u'source': 'unicode',
- u'to_user': 'unicode',
- u'id_str': 'unicode',
- u'from_user': 'unicode',
- u'place': {u'type': 'unicode', u'id': 'unicode', u'full_name': 'unicode'},
- u'from_user_id': 'int',
- u'to_user_id': 'NoneType',
- u'geo': 'NoneType',
- u'id': 'int',
- u'metadata': {u'result_type': 'unicode'}
-}
-
-tweet_tweet = {
- 'contributors': None,
- 'coordinates': None,
- 'created_at': 'date',
- 'entities': "tweet_entity",
- 'favorited': "bool",
- 'geo': None,
- 'id': "long",
- 'id_str': "string",
- 'in_reply_to_screen_name': "string",
- 'in_reply_to_status_id': "long",
- 'in_reply_to_status_id_str': "string",
- 'in_reply_to_user_id': "int",
- 'in_reply_to_user_id_str': "string",
- 'place': "string",
- 'retweet_count': "int",
- 'retweeted': "bool",
- 'source': "string",
- 'text': "string",
- 'truncated': "bool",
- 'user': "tweet_user"
-}
-tweet_user = {
- 'contributors_enabled': 'bool',
- 'created_at': 'str',
- 'description': 'str',
- 'favourites_count': 'int',
- 'follow_request_sent': None,
- 'followers_count': 'int',
- 'following': None,
- 'friends_count': 'int',
- 'geo_enabled': 'bool',
- 'id': 'int',
- 'id_str': 'str',
- 'is_translator': 'bool',
- 'lang': 'str',
- 'listed_count': 'int',
- 'location': 'str',
- 'name': 'str',
- 'notifications': 'NoneType',
- 'profile_background_color': 'str',
- 'profile_background_image_url': 'str',
- 'profile_background_tile': 'bool',
- 'profile_image_url': 'str',
- 'profile_link_color': 'str',
- 'profile_sidebar_border_color': 'str',
- 'profile_sidebar_fill_color': 'str',
- 'profile_text_color': 'str',
- 'profile_use_background_image': 'bool',
- 'protected': 'bool',
- 'screen_name': 'str',
- 'show_all_inline_media': 'bool',
- 'statuses_count': 'int',
- 'time_zone': 'str',
- 'url': 'str',
- 'utc_offset': 'int',
- 'verified': 'bool',
-}
-
-
-tweet_entity_hashtag = {
- 'hashtag' : 'tweet_hashtag',
- 'indice_start' : 'int',
- 'indice_end' : 'int',
- 'tweet':'tweet_tweet'
-}
-
-tweet_entity_url = {
- 'url' : 'tweet_url',
- 'indice_start' : 'int',
- 'indice_end' : 'int',
- 'tweet':'tweet_tweet'
-}
-
-tweet_entity_user = {
- 'user' : 'tweet_user',
- 'indice_start' : 'int',
- 'indice_end' : 'int',
- 'tweet':'tweet_tweet'
-}
-
-#id int
-#id_str str
-#indices list
-#name str
-#screen_name str
-
-tweet_hashtag = {
- "text": "string"
-}
-
-tweet_url = {
- "url": "string",
- "expanded_url" : "string",
-}
-
--- a/script/iri_tweet/utils.py Tue Jan 18 10:08:03 2011 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,310 +0,0 @@
-from models import *
-import datetime
-import email.utils
-import json
-import logging
-import sys
-import twitter
-import twitter_text
-import os.path
-import twitter.oauth
-
-
-def get_oauth_token(token_file_path=None):
-
- if token_file_path and os.path.file_exists(token_file_path):
- logging.debug("reading token from file %s" % token_file_path)
- return twitter.oauth.read_token_file(token_file_path)
- #read access token info from path
-
- if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
- return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
-
- return twitter.oauth_dance.oauth_dance(APPLICATION_NAME, CONSUMER_KEY, CONSUMER_SECRET, token_filename)
-
-def parse_date(date_str):
- ts = email.utils.parsedate_tz(date_str)
- return datetime.datetime(*ts[0:7])
-
-
-fields_adapter = {
- 'stream': {
- "tweet": {
- "created_at" : adapt_date,
- "coordinates" : adapt_json,
- "place" : adapt_json,
- "geo" : adapt_json,
-# "original_json" : adapt_json,
- },
- "user": {
- "created_at" : adapt_date,
- },
- },
- 'rest': {
- "tweet" : {
- "place" : adapt_json,
- "geo" : adapt_json,
- "created_at" : adapt_date,
-# "original_json" : adapt_json,
- },
- },
-}
-
-#
-# adapt fields, return a copy of the field_dict with adapted fields
-#
-def adapt_fields(fields_dict, adapter_mapping):
- def adapt_one_field(field, value):
- if field in adapter_mapping and adapter_mapping[field] is not None:
- return adapter_mapping[field](value)
- else:
- return value
- return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])
-
-
-
-class TwitterProcessorException(Exception):
- pass
-
-class TwitterProcessor(object):
-
- def __init__(self, json_dict, json_txt, session):
-
- if json_dict is None and json_txt is None:
- raise TwitterProcessorException("No json")
-
- if json_dict is None:
- self.json_dict = json.loads(json_txt)
- else:
- self.json_dict = json_dict
-
- if not json_txt:
- self.json_txt = json.dumps(json_dict)
- else:
- self.json_txt = json_txt
-
- if "id" not in self.json_dict:
- raise TwitterProcessorException("No id in json")
-
- self.session = session
-
- def __get_user(self, user_dict):
- logging.debug("Get user : " + repr(user_dict))
-
- user_id = user_dict.get("id",None)
- user_name = user_dict.get("screen_name", user_dict.get("name", None))
-
- if user_id is None and user_name is None:
- return None
-
- if user_id:
- user = self.session.query(User).filter(User.id == user_id).first()
- else:
- user = self.session.query(User).filter(User.screen_name == user_name).first()
-
- if user is not None:
- return user
-
- user_created_at = user_dict.get("created_at", None)
-
- if user_created_at is None:
- acess_token_key, access_token_secret = get_oauth_token()
- t = twitter.Twitter(auth=twitter.OAuth(token_key, token_secret, CONSUMER_KEY, CONSUMER_SECRET))
- try:
- if user_id:
- user_dict = t.users.show(user_id=user_id)
- else:
- user_dict = t.users.show(screen_name=user_name)
- except Exception as e:
- logging.info("get_user : TWITTER ERROR : " + repr(e))
- logging.info("get_user : TWITTER ERROR : " + str(e))
-
- user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
- if "id" not in user_dict:
- return None
-
- user = User(**user_dict)
-
- self.session.add(user)
- self.session.flush()
-
- return user
-
- def __process_entity(self, ind, ind_type):
- logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type))
-
- entity_dict = {
- "indice_start": ind["indices"][0],
- "indice_end" : ind["indices"][1],
- "tweet_id" : self.tweet.id,
- "tweet" : self.tweet
- }
-
- def process_hashtags():
- text = ind.get("text", ind.get("hashtag", None))
- if text is None:
- return None
- hashtag = self.session.query(Hashtag).filter(Hashtag.text == text).first()
- if not hashtag:
- ind["text"] = text
- hashtag = Hashtag(**ind)
- self.session.add(hashtag)
- self.session.flush()
- entity_dict['hashtag'] = hashtag
- entity_dict['hashtag_id'] = hashtag.id
- entity = EntityHashtag(**entity_dict)
- return entity
-
- def process_user_mentions():
- user_mention = self.__get_user(ind)
- if user_mention is None:
- entity_dict['user'] = None
- entity_dict['user_id'] = None
- else:
- entity_dict['user'] = user_mention
- entity_dict['user_id'] = user_mention.id
- entity = EntityUser(**entity_dict)
- return entity
-
- def process_urls():
- url = self.session.query(Url).filter(Url.url == ind["url"]).first()
- if url is None:
- url = Url(**ind)
- self.session.add(url)
- self.session.flush()
- entity_dict['url'] = url
- entity_dict['url_id'] = url.id
- entity = EntityUrl(**entity_dict)
- return entity
-
- #{'': lambda }
- entity = {
- 'hashtags': process_hashtags,
- 'user_mentions' : process_user_mentions,
- 'urls' : process_urls
- }[ind_type]()
-
- logging.debug("Process_entity entity_dict: " + repr(entity_dict))
- if entity:
- self.session.add(entity)
- self.session.flush()
-
-
- def __process_twitter_stream(self):
-
- tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
- if tweet_nb > 0:
- return
-
- ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
-
- # get or create user
- user = self.__get_user(self.json_dict["user"])
- if user is None:
- log.warning("USER not found " + repr(ts["user"]))
- ts_copy["user"] = None
- ts_copy["user_id"] = None
- else:
- ts_copy["user"] = user
- ts_copy["user_id"] = ts_copy["user"].id
- ts_copy["original_json"] = self.json_txt
-
- self.tweet = Tweet(**ts_copy)
- self.session.add(self.tweet)
- self.session.flush()
-
- # get entities
- for ind_type, entity_list in self.json_dict["entities"].items():
- for ind in entity_list:
- self.__process_entity(ind, ind_type)
-
-
- def __process_twitter_rest(self):
- tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
- if tweet_nb > 0:
- return
-
- tweet_fields = {
- 'created_at': self.json_dict["created_at"],
- 'favorited': False,
- 'id': self.json_dict["id"],
- 'id_str': self.json_dict["id_str"],
- #'in_reply_to_screen_name': ts["to_user"],
- 'in_reply_to_user_id': self.json_dict["to_user_id"],
- 'in_reply_to_user_id_str': self.json_dict["to_user_id_str"],
- #'place': ts["place"],
- 'source': self.json_dict["source"],
- 'text': self.json_dict["text"],
- 'truncated': False,
- 'original_json' : self.json_txt,
- }
-
- #user
-
- user_fields = {
- 'id' : self.json_dict['from_user_id'],
- 'id_str' : self.json_dict['from_user_id_str'],
- 'lang' : self.json_dict['iso_language_code'],
- 'profile_image_url' : self.json_dict["profile_image_url"],
- 'screen_name' : self.json_dict["from_user"],
- }
-
- user = self.__get_user(user_fields)
- if user is None:
- log.warning("USER not found " + repr(user_fields))
- tweet_fields["user"] = None
- tweet_fields["user_id"] = None
- else:
- tweet_fields["user"] = user
- tweet_fields["user_id"] = user.id
-
- tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
- self.tweet = Tweet(**tweet_fields)
- session.add(self.tweet)
-
- text = self.tweet.text
-
- extractor = twitter_text.Extractor(text)
-
- for ind in extractor.extract_hashtags_with_indices():
- self.__process_entity(ind, "hashtags")
-
- for ind in extractor.extract_mentioned_screen_names_with_indices():
- self.__process_entity(ind, "user_mentions")
-
- for ind in extractor.extract_urls_with_indices():
- self.__process_entity(ind, "urls")
-
- self.session.flush()
-
-
- def process(self):
- if "metadata" in self.json_dict:
- self.__process_twitter_rest()
- else:
- self.__process_twitter_stream()
-
-
-def set_logging(options):
-
- logging_config = {}
-
- if options.logfile == "stdout":
- logging_config["stream"] = sys.stdout
- elif options.logfile == "stderr":
- logging_config["stream"] = sys.stderr
- else:
- logging_config["filename"] = options.logfile
-
- logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet))
- logging.basicConfig(**logging_config)
-
- options.debug = (options.verbose-options.quiet > 0)
-
-def set_logging_options(parser):
- parser.add_option("-l", "--log", dest="logfile",
- help="log to file", metavar="LOG", default="stderr")
- parser.add_option("-v", dest="verbose", action="count",
- help="verbose", metavar="VERBOSE", default=0)
- parser.add_option("-q", dest="quiet", action="count",
- help="quiet", metavar="QUIET", default=0)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/iri_tweet/create_twitter_export_conf.py Tue Jan 18 18:25:18 2011 +0100
@@ -0,0 +1,43 @@
+from lxml import etree
+from optparse import OptionParser
+
+def get_options():
+
+ parser = OptionParser()
+
+ parser.add_option("-f", "--file", dest="outputfile",
+ help="destination filename", metavar="FILE", default="twitter_export_conf.xml")
+ parser.add_option("-i", "--input", dest="inputfile",
+ help="inputfile", metavar="INPUT", default=None)
+
+ return parser.parse_args()
+
+if __name__ == "__main__":
+ (options, args) = get_options()
+
+ dest_filename = options.outputfile
+
+ path_list = []
+ if options.inputfile is None:
+ path_list = args
+ else:
+ with open(options.inputfile, 'r') as fi:
+ path_list = fi
+
+
+ root = etree.Element("twitter_export")
+
+
+ for path in path_list:
+
+ iri_doc = etree.parse(path)
+ media_nodes = iri_doc.xpath("/iri/body/medias/media[@id='video']/video")
+ duration = int(media_nodes[0].get("dur"))/1000
+
+ file_elem = etree.SubElement(root, "file")
+ etree.SubElement(file_elem, "path").text = path
+ etree.SubElement(file_elem, "start_date")
+ etree.SubElement(file_elem, "duration").text = unicode(duration)
+
+ tree = etree.ElementTree(root)
+ tree.write(dest_filename, encoding="utf-8", pretty_print=True, xml_declaration=True)
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/iri_tweet/export_tweet_db.py Tue Jan 18 18:25:18 2011 +0100
@@ -0,0 +1,47 @@
+from models import *
+from utils import *
+from optparse import OptionParser
+from sqlalchemy.orm import sessionmaker
+import logging
+import sqlite3
+import sys
+
+
+# 'entities': "tweet_entity",
+# 'user': "tweet_user"
+
+def get_option():
+
+ parser = OptionParser()
+
+ set_logging_options(parser)
+
+ return parser.parse_args()
+
+if __name__ == "__main__":
+
+ (options, args) = get_option()
+
+ set_logging(options)
+
+ with sqlite3.connect(args[0]) as conn_in:
+ engine, metadata = setup_database('sqlite:///'+args[1], echo=((options.verbose-options.quiet)>0))
+ Session = sessionmaker(bind=engine)
+ session = Session()
+ try:
+ curs_in = conn_in.cursor()
+ fields_mapping = {}
+ for i,res in enumerate(curs_in.execute("select json from tweet_tweet;")):
+ logging.debug("main loop %d : %s" % (i, res[0]))
+ processor = TwitterProcessor(eval(res[0]), res[0], session)
+ processor.process()
+ session.commit()
+ logging.debug("main : %d tweet processed" % (i+1))
+ except Exception, e:
+ session.rollback()
+ raise e
+ finally:
+ session.close()
+
+
+
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/iri_tweet/export_twitter_alchemy.py Tue Jan 18 18:25:18 2011 +0100
@@ -0,0 +1,216 @@
+#!/usr/bin/env python
+# coding=utf-8
+
+from lxml import etree
+from models import *
+from optparse import OptionParser
+from sqlalchemy import Table, Column, Integer, BigInteger, String, MetaData, \
+ ForeignKey
+from sqlalchemy.orm import sessionmaker, mapper
+from sqlalchemy.sql import select
+import datetime
+import email.utils
+import logging
+import os
+import os.path
+import re
+import sys
+import time
+import uuid
+
+#class TweetExclude(object):
+# def __init__(self, id):
+# self.id = id
+#
+# def __repr__(self):
+# return "<TweetExclude(id=%d)>" % (self.id)
+
+def parse_date(date_str):
+ ts = email.utils.parsedate_tz(date_str)
+ return datetime.datetime(*ts[0:7])
+
+def get_options():
+ parser = OptionParser()
+ parser.add_option("-f", "--file", dest="filename",
+ help="write export to file", metavar="FILE", default="project_enmi.ldt")
+ parser.add_option("-d", "--database", dest="database",
+ help="Input database", metavar="DATABASE")
+ parser.add_option("-s", "--start-date", dest="start_date",
+ help="start date", metavar="START_DATE")
+ parser.add_option("-e", "--end-date", dest="end_date",
+ help="end date", metavar="END_DATE")
+ parser.add_option("-I", "--content-file", dest="content_file",
+ help="Content file", metavar="CONTENT_FILE")
+ parser.add_option("-c", "--content", dest="content",
+ help="Content url", metavar="CONTENT")
+ parser.add_option("-V", "--video-url", dest="video",
+ help="video url", metavar="VIDEO")
+ parser.add_option("-i", "--content-id", dest="content_id",
+ help="Content id", metavar="CONTENT_ID")
+ parser.add_option("-x", "--exclude", dest="exclude",
+ help="file containing the id to exclude", metavar="EXCLUDE")
+ parser.add_option("-C", "--color", dest="color",
+ help="Color code", metavar="COLOR", default="16763904")
+ parser.add_option("-H", "--hashtag", dest="hashtag",
+ help="Hashtag", metavar="HASHTAG", default="enmi")
+ parser.add_option("-D", "--duration", dest="duration", type="int",
+ help="Duration", metavar="DURATION", default=None)
+ parser.add_option("-n", "--name", dest="name",
+ help="Cutting name", metavar="NAME", default=u"Tweets")
+ parser.add_option("-R", "--replace", dest="replace", action="store_true",
+ help="Replace tweet ensemble", metavar="REPLACE", default=False)
+ parser.add_option("-l", "--log", dest="logfile",
+ help="log to file", metavar="LOG", default="stderr")
+
+ set_logging_options(parser)
+
+
+ return parser.parse_args()
+
+
+if __name__ == "__main__" :
+
+ (options, args) = get_options()
+
+ set_logging(options)
+
+ logging.debug("OPTIONS : " + repr(options))
+
+ engine, metadata = setup_database('sqlite:///'+options.database, echo=((options.verbose-options.quiet)>0), create_all = False)
+
+ Session = sessionmaker()
+ conn = engine.connect()
+ try :
+ session = Session(bind=conn)
+ try :
+
+ metadata = MetaData(bind=conn)
+ tweet_exclude_table = Table("tweet_exclude", metadata, Column('id', BigInteger, primary_key=True), prefixes=['TEMPORARY'])
+ #mapper(TweetExclude, tweet_exclude_table)
+ metadata.create_all()
+
+ if options.exclude and os.path.exists(options.exclude):
+ with open(options.exclude, 'r+') as f:
+ tei = tweet_exclude_table.insert()
+ for line in f:
+ conn.execute(tei.values(id=long(line.strip())))
+
+ if options.listconf:
+
+ parameters = []
+ confdoc = etree.parse(options.listconf)
+ for node in confdoc.xpath("/twitter_export/file"):
+ params = {}
+ for snode in node:
+ if snode.tag == "path":
+ params['content_file'] = snode.text
+ elif snode.tag == "start_date":
+ params['start_date'] = snode.text
+ elif snode.tag == "end_date":
+ params['end_date'] = snode.text
+ elif snode.tag == "duration":
+ params['duration'] = int(snode.text)
+ parameters.append(params)
+ else:
+ parameters = [{
+ 'start_date': options.start_date,
+ 'end_date' : options.end_date,
+ 'duration' : options.duration,
+ 'content_file' : otions.content_file
+
+ }]
+
+ for params in parameters:
+
+ logging.debug("PARAMETERS " + repr(params))
+
+ start_date_str = params.get("start_date",None)
+ end_date_str = params.get("end_date", None)
+ duration = params.get("duration", None)
+ content_file = params.get("content_file", None)
+
+
+ start_date = parse_date(start_date_str)
+ ts = time.mktime(start_date.timetuple())
+
+ if end_date_str:
+ end_date = parse_date(end_date_str)
+ te = time.mktime(end_date.timetuple())
+ else:
+ te = ts + duration
+ end_date = start_date + datetime.timedelta(seconds=duration)
+
+
+ query_res = session.query(Tweet).join(EntityHashtag).join(Hashtag).filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))).filter(Hashtag.text.contains(options.hashtag)).filter(Tweet.created_at >= start_date).filter(Tweet.created_at <= end_date).all()
+
+ #hashtag = u"%#"+unicode(options.hashtag)+u"%"
+
+ #cursor.execute("select tt.id, tt.text, tt.created_at_ts, tu.name, tu.screen_name from tweet_tweet as tt join tweet_user as tu on tt.user = tu.rowid where text like ? and tt.created_at_ts >= ? and tt.created_at_ts <= ? and tt.id not in (select id from tweet_exclude) order by tt.created_at_ts asc;", (hashtag,ts,te));
+
+ root = None
+ ensemble_parent = None
+
+ if content_file and os.path.exists(content_file):
+
+ doc = etree.parse(content_file)
+ root = doc.getroot()
+
+ ensemble_parent = root.xpath("//ensembles")[0]
+
+ else:
+ root = etree.Element(u"iri")
+
+ project = etree.SubElement(root, u"project", {u"abstract":u"Twitter comments on ENMI",u"title":u"Twitter comments on ENMI 2009", u"user":u"IRI Web", u"id":unicode(uuid.uuid4())})
+
+ medias = etree.SubElement(root, u"medias")
+ media = etree.SubElement(medias, u"media", {u"pict":u"", u"src":unicode(options.content), u"video":unicode(options.video), u"id":unicode(options.content_id), u"extra":u""})
+
+ annotations = etree.SubElement(root, u"annotations")
+ content = etree.SubElement(annotations, u"content", {u"id":unicode(options.content_id)})
+ ensemble_parent = content
+
+ if options.replace:
+ for ens in ensemble_parent.iterchildren(tag=u"ensemble"):
+ if ens.get("id","").startswith("tweet_"):
+ ensemble_parent.remove(ens)
+
+ ensemble = etree.SubElement(ensemble_parent, u"ensemble", {u"id":u"tweet_" + unicode(uuid.uuid4()), u"title":u"Ensemble Twitter", u"author":u"IRI Web", u"abstract":u"Ensemble Twitter pour ENMI 2009"})
+ decoupage = etree.SubElement(ensemble, u"decoupage", {u"id": unicode(uuid.uuid4()), u"author": u"IRI Web"})
+
+ etree.SubElement(decoupage, u"title").text = unicode(options.name)
+ etree.SubElement(decoupage, u"abstract").text = unicode(options.name)
+
+ elements = etree.SubElement(decoupage, u"elements")
+
+ for tw in query_res:
+ tweet_ts_dt = tw.created_at
+ tweet_ts = int(time.mktime(tweet_ts_dt.timetuple()))
+ tweet_ts_rel = (tweet_ts-ts) * 1000
+ username = None
+ if tw.user is not None:
+ username = tw.user.name
+ if not username:
+ username = "anon."
+ element = etree.SubElement(elements, u"element" , {u"id":unicode(uuid.uuid4())+u"-"+unicode(tw.id), u"color":unicode(options.color), u"author":unicode(username), u"date":unicode(tweet_ts_dt.strftime("%Y/%m/%d")), u"begin": unicode(tweet_ts_rel), u"dur":u"0", u"src":u""})
+ etree.SubElement(element, u"title").text = unicode(username) + u": " + unicode(tw.text)
+ etree.SubElement(element, u"abstract").text = unicode(tw.text)
+
+ tags_node = etree.SubElement(element, u"tags")
+
+ for entity in tw.entity_list:
+ if entity.type == u'entity_hashtag':
+ etree.SubElement(tags_node,u"tag").text = entity.hashtag.text
+
+ if content_file and os.path.exists(content_file):
+ output = open(content_file, "w")
+ else:
+ output = open(options.filename, "w")
+
+ output.write(etree.tostring(root, encoding="utf-8", method="xml", pretty_print=True, xml_declaration=True))
+ output.flush()
+ output.close()
+
+ finally:
+ session.close()
+ finally:
+ conn.close()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/iri_tweet/models.py Tue Jan 18 18:25:18 2011 +0100
@@ -0,0 +1,314 @@
+from sqlalchemy import Boolean, Table, Column, BigInteger, Integer, String, \
+ MetaData, ForeignKey, DateTime, create_engine
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import relationship, backref
+import datetime
+import email.utils
+import simplejson
+
+
+Base = declarative_base()
+
+APPLICATION_NAME = "IRI_TWITTER"
+CONSUMER_KEY = "54ThDZhpEjokcMgHJOMnQA"
+CONSUMER_SECRET = "wUoL9UL2T87tfc97R0Dff2EaqRzpJ5XGdmaN2XK3udA"
+#ACCESS_TOKEN_KEY= "47312923-LiNTtz0I18YXMVIrFeTuhmH7bOvYsK6p3Ln2Dc"
+#ACCESS_TOKEN_SECRET = "r3LoXVcjImNAElUpWqTu2SG2xCdWFHkva7xeQoncA"
+
+def adapt_date(date_str):
+ ts = email.utils.parsedate_tz(date_str)
+ return datetime.datetime(*ts[0:7])
+
+def adapt_json(obj):
+ if obj is None:
+ return None
+ else:
+ return simplejson.dumps(obj)
+
+class Entity(Base):
+ __tablename__ = "tweet_entity"
+ id = Column(Integer, primary_key = True)
+ tweet_id = Column(BigInteger, ForeignKey('tweet_tweet.id'))
+ #tweet = relationship(Tweet, primaryjoin = tweet_id == Tweet.id)
+ type = Column(String)
+ indice_start = Column(Integer)
+ indice_end = Column(Integer)
+ __mapper_args__ = {'polymorphic_on': type}
+
+ def __init__(self, **kwargs):
+ for key, value in kwargs.items():
+ if hasattr(self,key):
+ setattr(self,key,value)
+
+
+class Tweet(Base):
+ __tablename__ = 'tweet_tweet'
+
+ id = Column(BigInteger, primary_key=True, autoincrement=False)
+ id_str = Column(String)
+ contributors = Column(String)
+ coordinates = Column(String)
+ created_at = Column(DateTime)
+ favorited = Column(Boolean)
+ geo = Column(String)
+ in_reply_to_screen_name = Column(String)
+ in_reply_to_status_id = Column(BigInteger)
+ in_reply_to_status_id_str = Column(String)
+ in_reply_to_user_id = Column(Integer)
+ in_reply_to_user_id_str = Column(String)
+ place = Column(String)
+ retweet_count = Column(Integer)
+ retweeted = Column(Boolean)
+ source = Column(String)
+ text = Column(String)
+ truncated = Column(Boolean)
+ user_id = Column(Integer, ForeignKey('tweet_user.id'))
+ original_json = Column(String)
+ entity_list = relationship(Entity, backref='tweet')
+
+ #user = relationship(User, primaryjoin=user_id == User.id)
+
+ def __init__(self, **kwargs):
+ for key, value in kwargs.items():
+ if hasattr(self,key):
+ setattr(self,key,value)
+
+
+class User(Base):
+ __tablename__ = "tweet_user"
+
+ id = Column(Integer, primary_key = True, autoincrement=False)
+ id_str= Column(String)
+ contributors_enabled= Column(Boolean)
+ created_at= Column(DateTime)
+ description= Column(String)
+ favourites_count = Column(Integer)
+ follow_request_sent = Column(Boolean)
+ followers_count = Column(Integer)
+ following = Column(String)
+ friends_count = Column(Integer)
+ geo_enabled= Column(Boolean)
+ is_translator= Column(Boolean)
+ lang = Column(String)
+ listed_count = Column(Integer)
+ location= Column(String)
+ name = Column(String)
+ notifications = Column(String)
+ profile_background_color= Column(String)
+ profile_background_image_url= Column(String)
+ profile_background_tile= Column(Boolean)
+ profile_image_url= Column(String)
+ profile_link_color= Column(String)
+ profile_sidebar_border_color= Column(String)
+ profile_sidebar_fill_color= Column(String)
+ profile_text_color= Column(String)
+ profile_use_background_image= Column(Boolean)
+ protected= Column(Boolean)
+ screen_name= Column(String)
+ show_all_inline_media= Column(Boolean)
+ statuses_count = Column(Integer)
+ time_zone= Column(String)
+ url= Column(String)
+ utc_offset = Column(Integer)
+ verified= Column(Boolean)
+ tweets = relationship(Tweet, backref='user')
+
+ def __init__(self, **kwargs):
+ for key, value in kwargs.items():
+ if hasattr(self,key):
+ setattr(self,key,value)
+
+
+
+class Hashtag(Base):
+ __tablename__ = "tweet_hashtag"
+ id = Column(Integer, primary_key=True)
+ text = Column(String, unique = True)
+ def __init__(self, **kwargs):
+ for key, value in kwargs.items():
+ if hasattr(self,key):
+ setattr(self,key,value)
+
+
+
+class Url(Base):
+ __tablename__ = "tweet_url"
+ id = Column(Integer, primary_key=True)
+ url = Column(String, unique=True)
+ expanded_url = Column(String)
+ def __init__(self, **kwargs):
+ for key, value in kwargs.items():
+ if hasattr(self,key):
+ setattr(self,key,value)
+
+
+
+class EntityHashtag(Entity):
+ __tablename__ = "tweet_entity_hashtag"
+ __mapper_args__ = {'polymorphic_identity': 'entity_hashtag'}
+ id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
+ hashtag_id = Column(Integer, ForeignKey("tweet_hashtag.id"))
+ hashtag = relationship(Hashtag, primaryjoin=hashtag_id == Hashtag.id)
+ def __init__(self, **kwargs):
+ super(EntityHashtag, self).__init__(**kwargs)
+ for key, value in kwargs.items():
+ if hasattr(self,key):
+ setattr(self,key,value)
+
+
+class EntityUrl(Entity):
+ __tablename__ = "tweet_entity_url"
+ __mapper_args__ = {'polymorphic_identity': 'entity_url'}
+ id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
+ url_id = Column(Integer, ForeignKey("tweet_url.id"))
+ url = relationship(Url, primaryjoin=url_id == Url.id)
+ def __init__(self, **kwargs):
+ super(EntityUrl, self).__init__(**kwargs)
+ for key, value in kwargs.items():
+ if hasattr(self,key):
+ setattr(self,key,value)
+
+class EntityUser(Entity):
+ __tablename__ = "tweet_entity_user"
+ __mapper_args__ = {'polymorphic_identity': 'entity_user'}
+ id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
+ user_id = Column(Integer, ForeignKey('tweet_user.id'))
+ user = relationship(User, primaryjoin=user_id == User.id)
+
+ def __init__(self, **kwargs):
+ super(EntityUser, self).__init__(**kwargs)
+ for key, value in kwargs.items():
+ if hasattr(self,key):
+ setattr(self,key,value)
+
+
+def setup_database(*args, **kwargs):
+
+ create_all = True
+ if "create_all" in kwargs:
+ create_all = kwargs["create_all"]
+ del(kwargs["create_all"])
+
+ engine = create_engine(*args, **kwargs)
+ metadata = Base.metadata
+
+ if create_all:
+ metadata.create_all(engine)
+
+ return (engine, metadata)
+
+rest_tweet_tweet = {
+ u'iso_language_code': 'unicode',
+ u'text': 'unicode',
+ u'from_user_id_str': 'unicode',
+ u'profile_image_url': 'unicode',
+ u'to_user_id_str': 'NoneType',
+ u'created_at': 'unicode',
+ u'source': 'unicode',
+ u'to_user': 'unicode',
+ u'id_str': 'unicode',
+ u'from_user': 'unicode',
+ u'place': {u'type': 'unicode', u'id': 'unicode', u'full_name': 'unicode'},
+ u'from_user_id': 'int',
+ u'to_user_id': 'NoneType',
+ u'geo': 'NoneType',
+ u'id': 'int',
+ u'metadata': {u'result_type': 'unicode'}
+}
+
+tweet_tweet = {
+ 'contributors': None,
+ 'coordinates': None,
+ 'created_at': 'date',
+ 'entities': "tweet_entity",
+ 'favorited': "bool",
+ 'geo': None,
+ 'id': "long",
+ 'id_str': "string",
+ 'in_reply_to_screen_name': "string",
+ 'in_reply_to_status_id': "long",
+ 'in_reply_to_status_id_str': "string",
+ 'in_reply_to_user_id': "int",
+ 'in_reply_to_user_id_str': "string",
+ 'place': "string",
+ 'retweet_count': "int",
+ 'retweeted': "bool",
+ 'source': "string",
+ 'text': "string",
+ 'truncated': "bool",
+ 'user': "tweet_user"
+}
+tweet_user = {
+ 'contributors_enabled': 'bool',
+ 'created_at': 'str',
+ 'description': 'str',
+ 'favourites_count': 'int',
+ 'follow_request_sent': None,
+ 'followers_count': 'int',
+ 'following': None,
+ 'friends_count': 'int',
+ 'geo_enabled': 'bool',
+ 'id': 'int',
+ 'id_str': 'str',
+ 'is_translator': 'bool',
+ 'lang': 'str',
+ 'listed_count': 'int',
+ 'location': 'str',
+ 'name': 'str',
+ 'notifications': 'NoneType',
+ 'profile_background_color': 'str',
+ 'profile_background_image_url': 'str',
+ 'profile_background_tile': 'bool',
+ 'profile_image_url': 'str',
+ 'profile_link_color': 'str',
+ 'profile_sidebar_border_color': 'str',
+ 'profile_sidebar_fill_color': 'str',
+ 'profile_text_color': 'str',
+ 'profile_use_background_image': 'bool',
+ 'protected': 'bool',
+ 'screen_name': 'str',
+ 'show_all_inline_media': 'bool',
+ 'statuses_count': 'int',
+ 'time_zone': 'str',
+ 'url': 'str',
+ 'utc_offset': 'int',
+ 'verified': 'bool',
+}
+
+
+tweet_entity_hashtag = {
+ 'hashtag' : 'tweet_hashtag',
+ 'indice_start' : 'int',
+ 'indice_end' : 'int',
+ 'tweet':'tweet_tweet'
+}
+
+tweet_entity_url = {
+ 'url' : 'tweet_url',
+ 'indice_start' : 'int',
+ 'indice_end' : 'int',
+ 'tweet':'tweet_tweet'
+}
+
+tweet_entity_user = {
+ 'user' : 'tweet_user',
+ 'indice_start' : 'int',
+ 'indice_end' : 'int',
+ 'tweet':'tweet_tweet'
+}
+
+#id int
+#id_str str
+#indices list
+#name str
+#screen_name str
+
+tweet_hashtag = {
+ "text": "string"
+}
+
+tweet_url = {
+ "url": "string",
+ "expanded_url" : "string",
+}
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/iri_tweet/utils.py Tue Jan 18 18:25:18 2011 +0100
@@ -0,0 +1,310 @@
+from models import *
+import datetime
+import email.utils
+import json
+import logging
+import sys
+import twitter
+import twitter_text
+import os.path
+import twitter.oauth
+
+
+def get_oauth_token(token_file_path=None):
+
+ if token_file_path and os.path.file_exists(token_file_path):
+ logging.debug("reading token from file %s" % token_file_path)
+ return twitter.oauth.read_token_file(token_file_path)
+ #read access token info from path
+
+ if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
+ return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
+
+ return twitter.oauth_dance.oauth_dance(APPLICATION_NAME, CONSUMER_KEY, CONSUMER_SECRET, token_filename)
+
+def parse_date(date_str):
+ ts = email.utils.parsedate_tz(date_str)
+ return datetime.datetime(*ts[0:7])
+
+
+fields_adapter = {
+ 'stream': {
+ "tweet": {
+ "created_at" : adapt_date,
+ "coordinates" : adapt_json,
+ "place" : adapt_json,
+ "geo" : adapt_json,
+# "original_json" : adapt_json,
+ },
+ "user": {
+ "created_at" : adapt_date,
+ },
+ },
+ 'rest': {
+ "tweet" : {
+ "place" : adapt_json,
+ "geo" : adapt_json,
+ "created_at" : adapt_date,
+# "original_json" : adapt_json,
+ },
+ },
+}
+
+#
+# adapt fields, return a copy of the field_dict with adapted fields
+#
+def adapt_fields(fields_dict, adapter_mapping):
+ def adapt_one_field(field, value):
+ if field in adapter_mapping and adapter_mapping[field] is not None:
+ return adapter_mapping[field](value)
+ else:
+ return value
+ return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])
+
+
+
+class TwitterProcessorException(Exception):
+ pass
+
+class TwitterProcessor(object):
+
+ def __init__(self, json_dict, json_txt, session):
+
+ if json_dict is None and json_txt is None:
+ raise TwitterProcessorException("No json")
+
+ if json_dict is None:
+ self.json_dict = json.loads(json_txt)
+ else:
+ self.json_dict = json_dict
+
+ if not json_txt:
+ self.json_txt = json.dumps(json_dict)
+ else:
+ self.json_txt = json_txt
+
+ if "id" not in self.json_dict:
+ raise TwitterProcessorException("No id in json")
+
+ self.session = session
+
+ def __get_user(self, user_dict):
+ logging.debug("Get user : " + repr(user_dict))
+
+ user_id = user_dict.get("id",None)
+ user_name = user_dict.get("screen_name", user_dict.get("name", None))
+
+ if user_id is None and user_name is None:
+ return None
+
+ if user_id:
+ user = self.session.query(User).filter(User.id == user_id).first()
+ else:
+ user = self.session.query(User).filter(User.screen_name == user_name).first()
+
+ if user is not None:
+ return user
+
+ user_created_at = user_dict.get("created_at", None)
+
+ if user_created_at is None:
+ acess_token_key, access_token_secret = get_oauth_token()
+ t = twitter.Twitter(auth=twitter.OAuth(token_key, token_secret, CONSUMER_KEY, CONSUMER_SECRET))
+ try:
+ if user_id:
+ user_dict = t.users.show(user_id=user_id)
+ else:
+ user_dict = t.users.show(screen_name=user_name)
+ except Exception as e:
+ logging.info("get_user : TWITTER ERROR : " + repr(e))
+ logging.info("get_user : TWITTER ERROR : " + str(e))
+
+ user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
+ if "id" not in user_dict:
+ return None
+
+ user = User(**user_dict)
+
+ self.session.add(user)
+ self.session.flush()
+
+ return user
+
+ def __process_entity(self, ind, ind_type):
+ logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type))
+
+ entity_dict = {
+ "indice_start": ind["indices"][0],
+ "indice_end" : ind["indices"][1],
+ "tweet_id" : self.tweet.id,
+ "tweet" : self.tweet
+ }
+
+ def process_hashtags():
+ text = ind.get("text", ind.get("hashtag", None))
+ if text is None:
+ return None
+ hashtag = self.session.query(Hashtag).filter(Hashtag.text == text).first()
+ if not hashtag:
+ ind["text"] = text
+ hashtag = Hashtag(**ind)
+ self.session.add(hashtag)
+ self.session.flush()
+ entity_dict['hashtag'] = hashtag
+ entity_dict['hashtag_id'] = hashtag.id
+ entity = EntityHashtag(**entity_dict)
+ return entity
+
+ def process_user_mentions():
+ user_mention = self.__get_user(ind)
+ if user_mention is None:
+ entity_dict['user'] = None
+ entity_dict['user_id'] = None
+ else:
+ entity_dict['user'] = user_mention
+ entity_dict['user_id'] = user_mention.id
+ entity = EntityUser(**entity_dict)
+ return entity
+
+ def process_urls():
+ url = self.session.query(Url).filter(Url.url == ind["url"]).first()
+ if url is None:
+ url = Url(**ind)
+ self.session.add(url)
+ self.session.flush()
+ entity_dict['url'] = url
+ entity_dict['url_id'] = url.id
+ entity = EntityUrl(**entity_dict)
+ return entity
+
+ #{'': lambda }
+ entity = {
+ 'hashtags': process_hashtags,
+ 'user_mentions' : process_user_mentions,
+ 'urls' : process_urls
+ }[ind_type]()
+
+ logging.debug("Process_entity entity_dict: " + repr(entity_dict))
+ if entity:
+ self.session.add(entity)
+ self.session.flush()
+
+
+ def __process_twitter_stream(self):
+
+ tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
+ if tweet_nb > 0:
+ return
+
+ ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
+
+ # get or create user
+ user = self.__get_user(self.json_dict["user"])
+ if user is None:
+ log.warning("USER not found " + repr(ts["user"]))
+ ts_copy["user"] = None
+ ts_copy["user_id"] = None
+ else:
+ ts_copy["user"] = user
+ ts_copy["user_id"] = ts_copy["user"].id
+ ts_copy["original_json"] = self.json_txt
+
+ self.tweet = Tweet(**ts_copy)
+ self.session.add(self.tweet)
+ self.session.flush()
+
+ # get entities
+ for ind_type, entity_list in self.json_dict["entities"].items():
+ for ind in entity_list:
+ self.__process_entity(ind, ind_type)
+
+
+ def __process_twitter_rest(self):
+ tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
+ if tweet_nb > 0:
+ return
+
+ tweet_fields = {
+ 'created_at': self.json_dict["created_at"],
+ 'favorited': False,
+ 'id': self.json_dict["id"],
+ 'id_str': self.json_dict["id_str"],
+ #'in_reply_to_screen_name': ts["to_user"],
+ 'in_reply_to_user_id': self.json_dict["to_user_id"],
+ 'in_reply_to_user_id_str': self.json_dict["to_user_id_str"],
+ #'place': ts["place"],
+ 'source': self.json_dict["source"],
+ 'text': self.json_dict["text"],
+ 'truncated': False,
+ 'original_json' : self.json_txt,
+ }
+
+ #user
+
+ user_fields = {
+ 'id' : self.json_dict['from_user_id'],
+ 'id_str' : self.json_dict['from_user_id_str'],
+ 'lang' : self.json_dict['iso_language_code'],
+ 'profile_image_url' : self.json_dict["profile_image_url"],
+ 'screen_name' : self.json_dict["from_user"],
+ }
+
+ user = self.__get_user(user_fields)
+ if user is None:
+ log.warning("USER not found " + repr(user_fields))
+ tweet_fields["user"] = None
+ tweet_fields["user_id"] = None
+ else:
+ tweet_fields["user"] = user
+ tweet_fields["user_id"] = user.id
+
+ tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
+ self.tweet = Tweet(**tweet_fields)
+ session.add(self.tweet)
+
+ text = self.tweet.text
+
+ extractor = twitter_text.Extractor(text)
+
+ for ind in extractor.extract_hashtags_with_indices():
+ self.__process_entity(ind, "hashtags")
+
+ for ind in extractor.extract_mentioned_screen_names_with_indices():
+ self.__process_entity(ind, "user_mentions")
+
+ for ind in extractor.extract_urls_with_indices():
+ self.__process_entity(ind, "urls")
+
+ self.session.flush()
+
+
+ def process(self):
+ if "metadata" in self.json_dict:
+ self.__process_twitter_rest()
+ else:
+ self.__process_twitter_stream()
+
+
+def set_logging(options):
+
+ logging_config = {}
+
+ if options.logfile == "stdout":
+ logging_config["stream"] = sys.stdout
+ elif options.logfile == "stderr":
+ logging_config["stream"] = sys.stderr
+ else:
+ logging_config["filename"] = options.logfile
+
+ logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet))
+ logging.basicConfig(**logging_config)
+
+ options.debug = (options.verbose-options.quiet > 0)
+
+def set_logging_options(parser):
+ parser.add_option("-l", "--log", dest="logfile",
+ help="log to file", metavar="LOG", default="stderr")
+ parser.add_option("-v", dest="verbose", action="count",
+ help="verbose", metavar="VERBOSE", default=0)
+ parser.add_option("-q", dest="quiet", action="count",
+ help="quiet", metavar="QUIET", default=0)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/tweetstream/CHANGELOG Tue Jan 18 18:25:18 2011 +0100
@@ -0,0 +1,44 @@
+0.1
+
+ - Initial version
+
+0.2
+
+ - Improved error handling
+ - Added AuthenticationError and ConnectionError exceptions
+ - Added ReconnectingTweetStream class that supports automatically
+ reconnecting if the connection is dropped
+
+0.3
+
+ - Fixed bugs in authtentication
+ - Added TrackStream and FollowStream classes
+ - Added list of endpoint names, and made them legal values for the url arg
+
+0.3.1
+
+ - Added lots of tests
+ - Added proper handling of keepalive newlines
+ - Improved handling of closing streams
+ - Added missing anyjson dependency to setup
+ - Fixed bug where newlines and malformed content were counted as a tweet
+
+0.3.2
+
+ - This release was skipped over, due to maintainer brainfart.
+
+0.3.3
+
+ - Fixed setup.py so it wont attempt to load modules that aren't installed
+ yet. Fixes installation issue.
+
+0.3.4
+
+ - Updated to latest twitter streaming urls
+ - Fixed a bug where we tried to call a method on None
+
+0.3.5
+
+ - Removed a spurious print statement left over from debugging
+ - Introduced common base class for all tweetstream exceptions
+ - Make sure we raise a sensible error on 404. Include url in desc of that error
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/tweetstream/LICENSE Tue Jan 18 18:25:18 2011 +0100
@@ -0,0 +1,27 @@
+Copyright (c) 2009, Rune Halvorsen
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+
+Neither the name of Rune Halvorsen nor the names of its contributors may be
+used to endorse or promote products derived from this software without
+specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
+BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/tweetstream/README Tue Jan 18 18:25:18 2011 +0100
@@ -0,0 +1,105 @@
+.. -*- restructuredtext -*-
+
+##########################################
+tweetstream - Simple twitter streaming API
+##########################################
+
+Introduction
+------------
+
+tweetstream provides a class, TweetStream, that can be used to get
+tweets from Twitter's streaming API. An instance of the class can be used as
+an iterator. In addition to fetching tweets, the object keeps track of
+the number of tweets collected and the rate at which tweets are received.
+
+Subclasses are available for accessing the "track" and "follow" streams
+as well.
+
+There's also a ReconnectingTweetStream class that handles automatic
+reconnecting.
+
+Twitter's documentation about the streaming API can be found here:
+http://apiwiki.twitter.com/Streaming-API-Documentation .
+
+**Note** that the API is blocking. If for some reason data is not immediatly
+available, calls will block until enough data is available to yield a tweet.
+
+Examples
+--------
+
+Printing all incomming tweets:
+
+>>> stream = tweetstream.TweetStream("username", "password")
+>>> for tweet in stream:
+... print tweet
+
+
+The stream object can also be used as a context, as in this example that
+prints the author for each tweet as well as the tweet count and rate:
+
+>>> with tweetstream.TweetStream("username", "password") as stream
+... for tweet in stream:
+... print "Got tweet from %-16s\t( tweet %d, rate %.1f tweets/sec)" % (
+... tweet["user"]["screen_name"], stream.count, stream.rate )
+
+
+Stream objects can raise ConnectionError or AuthenticationError exceptions:
+
+>>> try:
+... with tweetstream.TweetStream("username", "password") as stream
+... for tweet in stream:
+... print "Got tweet from %-16s\t( tweet %d, rate %.1f tweets/sec)" % (
+... tweet["user"]["screen_name"], stream.count, stream.rate )
+... except tweetstream.ConnectionError, e:
+... print "Disconnected from twitter. Reason:", e.reason
+
+To get tweets that relate to specific terms, use the TrackStream:
+
+>>> words = ["opera", "firefox", "safari"]
+>>> with tweetstream.TrackStream("username", "password", words) as stream
+... for tweet in stream:
+... print "Got interesting tweet:", tweet
+
+To get only tweets from a set of users, use the FollowStream. The following
+would get tweets for user 1, 42 and 8675309
+
+>>> users = [1, 42, 8675309]
+>>> with tweetstream.FollowStream("username", "password", users) as stream
+... for tweet in stream:
+... print "Got tweet from:", tweet["user"]["screen_name"]
+
+
+Simple tweet fetcher that sends tweets to an AMQP message server using carrot:
+
+>>> from carrot.messaging import Publisher
+>>> from carrot.connection import AMQPConnection
+>>> from tweetstream import TweetStream
+>>> amqpconn = AMQPConnection(hostname="localhost", port=5672,
+... userid="test", password="test",
+... vhost="test")
+>>> publisher = Publisher(connection=amqpconn,
+... exchange="tweets", routing_key="stream")
+>>> with TweetStream("username", "password") as stream:
+... for tweet in stream:
+... publisher.send(tweet)
+>>> publisher.close()
+
+
+Changelog
+---------
+
+See the CHANGELOG file
+
+Contact
+-------
+
+The author is Rune Halvorsen <runefh@gmail.com>. The project resides at
+http://bitbucket.org/runeh/tweetstream . If you find bugs, or have feature
+requests, please report them in the project site issue tracker. Patches are
+also very welcome.
+
+License
+-------
+
+This software is licensed under the ``New BSD License``. See the ``LICENCE``
+file in the top distribution directory for the full license text.
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/tweetstream/servercontext.py Tue Jan 18 18:25:18 2011 +0100
@@ -0,0 +1,221 @@
+import threading
+import contextlib
+import time
+import os
+import socket
+import random
+from functools import partial
+from inspect import isclass
+from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
+from SimpleHTTPServer import SimpleHTTPRequestHandler
+from SocketServer import BaseRequestHandler
+
+
+class ServerError(Exception):
+ pass
+
+
+class ServerContext(object):
+ """Context object with information about a running test server."""
+
+ def __init__(self, address, port):
+ self.address = address or "localhost"
+ self.port = port
+
+ @property
+ def baseurl(self):
+ return "http://%s:%s" % (self.address, self.port)
+
+ def __str__(self):
+ return "<ServerContext %s >" % self.baseurl
+
+ __repr__ = __str__
+
+
+class _SilentSimpleHTTPRequestHandler(SimpleHTTPRequestHandler):
+
+ def __init__(self, *args, **kwargs):
+ self.logging = kwargs.get("logging", False)
+ SimpleHTTPRequestHandler.__init__(self, *args, **kwargs)
+
+ def log_message(self, *args, **kwargs):
+ if self.logging:
+ SimpleHTTPRequestHandler.log_message(self, *args, **kwargs)
+
+
+class _TestHandler(BaseHTTPRequestHandler):
+ """RequestHandler class that handles requests that use a custom handler
+ callable."""
+
+ def __init__(self, handler, methods, *args, **kwargs):
+ self._handler = handler
+ self._methods = methods
+ self._response_sent = False
+ self._headers_sent = False
+ self.logging = kwargs.get("logging", False)
+ BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
+
+ def log_message(self, *args, **kwargs):
+ if self.logging:
+ BaseHTTPRequestHandler.log_message(self, *args, **kwargs)
+
+ def send_response(self, *args, **kwargs):
+ self._response_sent = True
+ BaseHTTPRequestHandler.send_response(self, *args, **kwargs)
+
+ def end_headers(self, *args, **kwargs):
+ self._headers_sent = True
+ BaseHTTPRequestHandler.end_headers(self, *args, **kwargs)
+
+ def _do_whatever(self):
+ """Called in place of do_METHOD"""
+ data = self._handler(self)
+
+ if hasattr(data, "next"):
+ # assume it's something supporting generator protocol
+ self._handle_with_iterator(data)
+ else:
+ # Nothing more to do then.
+ pass
+
+
+ def __getattr__(self, name):
+ if name.startswith("do_") and name[3:].lower() in self._methods:
+ return self._do_whatever
+ else:
+ # fixme instance or class?
+ raise AttributeError(name)
+
+ def _handle_with_iterator(self, iterator):
+ self.connection.settimeout(0.1)
+ for data in iterator:
+ if not self.server.server_thread.running:
+ return
+
+ if not self._response_sent:
+ self.send_response(200)
+ if not self._headers_sent:
+ self.end_headers()
+
+ self.wfile.write(data)
+ # flush immediatly. We may want to do trickling writes
+ # or something else tha trequires bypassing normal caching
+ self.wfile.flush()
+
+class _TestServerThread(threading.Thread):
+ """Thread class for a running test server"""
+
+ def __init__(self, handler, methods, cwd, port, address):
+ threading.Thread.__init__(self)
+ self.startup_finished = threading.Event()
+ self._methods = methods
+ self._cwd = cwd
+ self._orig_cwd = None
+ self._handler = self._wrap_handler(handler, methods)
+ self._setup()
+ self.running = True
+ self.serverloc = (address, port)
+ self.error = None
+
+ def _wrap_handler(self, handler, methods):
+ if isclass(handler) and issubclass(handler, BaseRequestHandler):
+ return handler # It's OK. user passed in a proper handler
+ elif callable(handler):
+ return partial(_TestHandler, handler, methods)
+ # it's a callable, so wrap in a req handler
+ else:
+ raise ServerError("handler must be callable or RequestHandler")
+
+ def _setup(self):
+ if self._cwd != "./":
+ self._orig_cwd = os.getcwd()
+ os.chdir(self._cwd)
+
+ def _init_server(self):
+ """Hooks up the server socket"""
+ try:
+ if self.serverloc[1] == "random":
+ retries = 10 # try getting an available port max this many times
+ while True:
+ try:
+ self.serverloc = (self.serverloc[0],
+ random.randint(1025, 49151))
+ self._server = HTTPServer(self.serverloc, self._handler)
+ except socket.error:
+ retries -= 1
+ if not retries: # not able to get a port.
+ raise
+ else:
+ break
+ else: # use specific port. this might throw, that's expected
+ self._server = HTTPServer(self.serverloc, self._handler)
+ except socket.error, e:
+ self.running = False
+ self.error = e
+ # set this here, since we'll never enter the serve loop where
+ # it is usually set:
+ self.startup_finished.set()
+ return
+
+ self._server.allow_reuse_address = True # lots of tests, same port
+ self._server.timeout = 0.1
+ self._server.server_thread = self
+
+
+ def run(self):
+ self._init_server()
+
+ while self.running:
+ self._server.handle_request() # blocks for self.timeout secs
+ # First time this falls through, signal the parent thread that
+ # the server is ready for incomming connections
+ if not self.startup_finished.is_set():
+ self.startup_finished.set()
+
+ self._cleanup()
+
+ def stop(self):
+ """Stop the server and attempt to make the thread terminate.
+ This happens async but the calling code can check periodically
+ the isRunning flag on the thread object.
+ """
+ # actual stopping happens in the run method
+ self.running = False
+
+ def _cleanup(self):
+ """Do some rudimentary cleanup."""
+ if self._orig_cwd:
+ os.chdir(self._orig_cwd)
+
+
+@contextlib.contextmanager
+def test_server(handler=_SilentSimpleHTTPRequestHandler, port=8514,
+ address="", methods=("get", "head"), cwd="./"):
+ """Context that makes available a web server in a separate thread"""
+ thread = _TestServerThread(handler=handler, methods=methods, cwd=cwd,
+ port=port, address=address)
+ thread.start()
+
+ # fixme: should this be daemonized? If it isn't it will block the entire
+ # app, but that should never happen anyway..
+ thread.startup_finished.wait()
+
+ if thread.error: # startup failed! Bail, throw whatever the server did
+ raise thread.error
+
+ exc = None
+ try:
+ yield ServerContext(*thread.serverloc)
+ except Exception, exc:
+ pass
+ thread.stop()
+ thread.join(5) # giving it a lot of leeway. should never happen
+
+ if exc:
+ raise exc
+
+ # fixme: this takes second priorty after the internal exception but would
+ # still be nice to signal back to calling code.
+
+ if thread.isAlive():
+ raise Warning("Test server could not be stopped")
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/tweetstream/setup.py Tue Jan 18 18:25:18 2011 +0100
@@ -0,0 +1,27 @@
+from setuptools import setup, find_packages
+import sys, os
+
+author = "Rune Halvorsen"
+email = "runefh@gmail.com"
+version = "0.3.5"
+homepage = "http://bitbucket.org/runeh/tweetstream/"
+
+setup(name='tweetstream',
+ version=version,
+ description="Simple Twitter streaming API access",
+ long_description=open("README").read(),
+ classifiers=[
+ 'License :: OSI Approved :: BSD License',
+ 'Intended Audience :: Developers',
+ ],
+ keywords='twitter',
+ author=author,
+ author_email=email,
+ url=homepage,
+ license='BSD',
+ packages=find_packages(exclude=['ez_setup', 'examples', 'tests']),
+ include_package_data=True,
+ zip_safe=False,
+ platforms=["any"],
+ install_requires = ['anyjson'],
+)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/tweetstream/tests/test_tweetstream.py Tue Jan 18 18:25:18 2011 +0100
@@ -0,0 +1,193 @@
+import contextlib
+import threading
+import time
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+
+from nose.tools import assert_raises
+from tweetstream import TweetStream, FollowStream, TrackStream
+from tweetstream import ConnectionError, AuthenticationError
+
+from servercontext import test_server
+
+single_tweet = r"""{"in_reply_to_status_id":null,"in_reply_to_user_id":null,"favorited":false,"created_at":"Tue Jun 16 10:40:14 +0000 2009","in_reply_to_screen_name":null,"text":"record industry just keeps on amazing me: http:\/\/is.gd\/13lFo - $150k per song you've SHARED, not that somebody has actually DOWNLOADED.","user":{"notifications":null,"profile_background_tile":false,"followers_count":206,"time_zone":"Copenhagen","utc_offset":3600,"friends_count":191,"profile_background_color":"ffffff","profile_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_images\/250715794\/profile_normal.png","description":"Digital product developer, currently at Opera Software. My tweets are my opinions, not those of my employer.","verified_profile":false,"protected":false,"favourites_count":0,"profile_text_color":"3C3940","screen_name":"eiriksnilsen","name":"Eirik Stridsklev N.","following":null,"created_at":"Tue May 06 12:24:12 +0000 2008","profile_background_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_background_images\/10531192\/160x600opera15.gif","profile_link_color":"0099B9","profile_sidebar_fill_color":"95E8EC","url":"http:\/\/www.stridsklev-nilsen.no\/eirik","id":14672543,"statuses_count":506,"profile_sidebar_border_color":"5ED4DC","location":"Oslo, Norway"},"id":2190767504,"truncated":false,"source":"<a href=\"http:\/\/widgets.opera.com\/widget\/7206\">Twitter Opera widget<\/a>"}"""
+
+
+def test_bad_auth():
+ """Test that the proper exception is raised when the user could not be
+ authenticated"""
+ def auth_denied(request):
+ request.send_error(401)
+
+ with test_server(handler=auth_denied, methods=("post", "get"),
+ port="random") as server:
+ stream = TweetStream("foo", "bar", url=server.baseurl)
+ assert_raises(AuthenticationError, stream.next)
+
+ stream = FollowStream("foo", "bar", [1, 2, 3], url=server.baseurl)
+ assert_raises(AuthenticationError, stream.next)
+
+ stream = TrackStream("foo", "bar", ["opera"], url=server.baseurl)
+ assert_raises(AuthenticationError, stream.next)
+
+
+def test_404_url():
+ """Test that the proper exception is raised when the stream URL can't be
+ found"""
+ def not_found(request):
+ request.send_error(404)
+
+ with test_server(handler=not_found, methods=("post", "get"),
+ port="random") as server:
+ stream = TweetStream("foo", "bar", url=server.baseurl)
+ assert_raises(ConnectionError, stream.next)
+
+ stream = FollowStream("foo", "bar", [1, 2, 3], url=server.baseurl)
+ assert_raises(ConnectionError, stream.next)
+
+ stream = TrackStream("foo", "bar", ["opera"], url=server.baseurl)
+ assert_raises(ConnectionError, stream.next)
+
+
+def test_bad_content():
+ """Test error handling if we are given invalid data"""
+ def bad_content(request):
+ for n in xrange(10):
+ # what json we pass doesn't matter. It's not verifying the
+ # strcuture, only checking that it's parsable
+ yield "[1,2,3]"
+ yield "[1,2, I need no stinking close brace"
+ yield "[1,2,3]"
+
+ def do_test(klass, *args):
+ with test_server(handler=bad_content, methods=("post", "get"),
+ port="random") as server:
+ stream = klass("foo", "bar", *args, url=server.baseurl)
+ for tweet in stream:
+ pass
+
+ assert_raises(ConnectionError, do_test, TweetStream)
+ assert_raises(ConnectionError, do_test, FollowStream, [1, 2, 3])
+ assert_raises(ConnectionError, do_test, TrackStream, ["opera"])
+
+
+def test_closed_connection():
+ """Test error handling if server unexpectedly closes connection"""
+ cnt = 1000
+ def bad_content(request):
+ for n in xrange(cnt):
+ # what json we pass doesn't matter. It's not verifying the
+ # strcuture, only checking that it's parsable
+ yield "[1,2,3]"
+
+ def do_test(klass, *args):
+ with test_server(handler=bad_content, methods=("post", "get"),
+ port="random") as server:
+ stream = klass("foo", "bar", *args, url=server.baseurl)
+ for tweet in stream:
+ pass
+
+ assert_raises(ConnectionError, do_test, TweetStream)
+ assert_raises(ConnectionError, do_test, FollowStream, [1, 2, 3])
+ assert_raises(ConnectionError, do_test, TrackStream, ["opera"])
+
+
+def test_bad_host():
+ """Test behaviour if we can't connect to the host"""
+ stream = TweetStream("foo", "bar", url="http://bad.egewdvsdswefdsf.com/")
+ assert_raises(ConnectionError, stream.next)
+
+ stream = FollowStream("foo", "bar", [1, 2, 3], url="http://zegwefdsf.com/")
+ assert_raises(ConnectionError, stream.next)
+
+ stream = TrackStream("foo", "bar", ["foo"], url="http://aswefdsews.com/")
+ assert_raises(ConnectionError, stream.next)
+
+
+def smoke_test_receive_tweets():
+ """Receive 100k tweets and disconnect (slow)"""
+ total = 100000
+
+ def tweetsource(request):
+ while True:
+ yield single_tweet + "\n"
+
+ def do_test(klass, *args):
+ with test_server(handler=tweetsource,
+ methods=("post", "get"), port="random") as server:
+ stream = klass("foo", "bar", *args, url=server.baseurl)
+ for tweet in stream:
+ if stream.count == total:
+ break
+
+ do_test(TweetStream)
+ do_test(FollowStream, [1, 2, 3])
+ do_test(TrackStream, ["foo", "bar"])
+
+
+def test_keepalive():
+ """Make sure we behave sanely when there are keepalive newlines in the
+ data recevived from twitter"""
+ def tweetsource(request):
+ yield single_tweet+"\n"
+ yield "\n"
+ yield "\n"
+ yield single_tweet+"\n"
+ yield "\n"
+ yield "\n"
+ yield "\n"
+ yield "\n"
+ yield "\n"
+ yield "\n"
+ yield "\n"
+ yield single_tweet+"\n"
+ yield "\n"
+
+ def do_test(klass, *args):
+ with test_server(handler=tweetsource, methods=("post", "get"),
+ port="random") as server:
+ stream = klass("foo", "bar", *args, url=server.baseurl)
+ try:
+ for tweet in stream:
+ pass
+ except ConnectionError:
+ assert stream.count == 3, "Got %s, wanted 3" % stream.count
+ else:
+ assert False, "Didn't handle keepalive"
+
+
+ do_test(TweetStream)
+ do_test(FollowStream, [1, 2, 3])
+ do_test(TrackStream, ["foo", "bar"])
+
+
+def test_buffering():
+ """Test if buffering stops data from being returned immediately.
+ If there is some buffering in play that might mean data is only returned
+ from the generator when the buffer is full. If buffer is bigger than a
+ tweet, this will happen. Default buffer size in the part of socket lib
+ that enables readline is 8k. Max tweet length is around 3k."""
+
+ def tweetsource(request):
+ yield single_tweet+"\n"
+ time.sleep(2)
+ # need to yield a bunch here so we're sure we'll return from the
+ # blocking call in case the buffering bug is present.
+ for n in xrange(100):
+ yield single_tweet+"\n"
+
+ def do_test(klass, *args):
+ with test_server(handler=tweetsource, methods=("post", "get"),
+ port="random") as server:
+ stream = klass("foo", "bar", *args, url=server.baseurl)
+
+ start = time.time()
+ stream.next()
+ first = time.time()
+ diff = first - start
+ assert diff < 1, "Getting first tweet took more than a second!"
+
+ do_test(TweetStream)
+ do_test(FollowStream, [1, 2, 3])
+ do_test(TrackStream, ["foo", "bar"])
+
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/tweetstream/tweetstream/__init__.py Tue Jan 18 18:25:18 2011 +0100
@@ -0,0 +1,297 @@
+"""
+Simple Twitter streaming API access
+"""
+__version__ = "0.3.5"
+__author__ = "Rune Halvorsen <runefh@gmail.com>"
+__homepage__ = "http://bitbucket.org/runeh/tweetstream/"
+__docformat__ = "restructuredtext"
+
+import urllib
+import urllib2
+import socket
+import time
+import anyjson
+
+
+"""
+ .. data:: URLS
+
+ Mapping between twitter endpoint names and URLs.
+
+ .. data:: USER_AGENT
+
+ The default user agent string for stream objects
+
+"""
+
+URLS = {"firehose": "http://stream.twitter.com/1/statuses/firehose.json",
+ "sample": "http://stream.twitter.com/1/statuses/sample.json",
+ "follow": "http://stream.twitter.com/1/statuses/filter.json",
+ "track": "http://stream.twitter.com/1/statuses/filter.json"}
+
+USER_AGENT = "TweetStream %s" % __version__
+
+
+class TweetStreamError(Exception):
+ """Base class for all tweetstream errors"""
+ pass
+
+class AuthenticationError(TweetStreamError):
+ """Exception raised if the username/password is not accepted
+ """
+ pass
+
+
+class ConnectionError(TweetStreamError):
+ """Raised when there are network problems. This means when there are
+ dns errors, network errors, twitter issues"""
+
+ def __init__(self, reason, details=None):
+ self.reason = reason
+ self.details = details
+
+ def __str__(self):
+ return '<ConnectionError %s>' % self.reason
+
+
+class TweetStream(object):
+ """A network connection to Twitters streaming API
+
+ :param username: Twitter username for the account accessing the API.
+ :param password: Twitter password for the account accessing the API.
+
+ :keyword url: URL to connect to. This can be either an endopoint name,
+ such as "sample", or a full URL. By default, the public "sample" url
+ is used. All known endpoints are defined in the :URLS: attribute
+
+ .. attribute:: connected
+
+ True if the object is currently connected to the stream.
+
+ .. attribute:: url
+
+ The URL to which the object is connected
+
+ .. attribute:: starttime
+
+ The timestamp, in seconds since the epoch, the object connected to the
+ streaming api.
+
+ .. attribute:: count
+
+ The number of tweets that have been returned by the object.
+
+ .. attribute:: rate
+
+ The rate at which tweets have been returned from the object as a
+ float. see also :attr: `rate_period`.
+
+ .. attribute:: rate_period
+
+ The ammount of time to sample tweets to calculate tweet rate. By
+ default 10 seconds. Changes to this attribute will not be reflected
+ until the next time the rate is calculated. The rate of tweets vary
+ with time of day etc. so it's usefull to set this to something
+ sensible.
+
+ .. attribute:: user_agent
+
+ User agent string that will be included in the request. NOTE: This can
+ not be changed after the connection has been made. This property must
+ thus be set before accessing the iterator. The default is set in
+ :attr: `USER_AGENT`.
+"""
+
+ def __init__(self, username, password, url="sample"):
+ self._conn = None
+ self._rate_ts = None
+ self._rate_cnt = 0
+ self._username = username
+ self._password = password
+
+ self.rate_period = 10 # in seconds
+ self.connected = False
+ self.starttime = None
+ self.count = 0
+ self.rate = 0
+ self.user_agent = USER_AGENT
+ self.url = URLS.get(url, url)
+
+ def __iter__(self):
+ return self
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *params):
+ self.close()
+ return False
+
+ def _init_conn(self):
+ """Open the connection to the twitter server"""
+ headers = {'User-Agent': self.user_agent}
+ req = urllib2.Request(self.url, self._get_post_data(), headers)
+
+ password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
+ password_mgr.add_password(None, self.url, self._username,
+ self._password)
+ handler = urllib2.HTTPBasicAuthHandler(password_mgr)
+ opener = urllib2.build_opener(handler)
+
+ try:
+ self._conn = opener.open(req)
+ except urllib2.HTTPError, exception:
+ if exception.code == 401:
+ raise AuthenticationError("Access denied")
+ elif exception.code == 404:
+ raise ConnectionError("URL not found: %s" % self.url)
+ else: # re raise. No idea what would cause this, so want to know
+ raise
+ except urllib2.URLError, exception:
+ raise ConnectionError(exception.reason)
+
+ self.connected = True
+ if not self.starttime:
+ self.starttime = time.time()
+ if not self._rate_ts:
+ self._rate_ts = time.time()
+
+ def _get_post_data(self):
+ """Subclasses that need to add post data to the request can override
+ this method and return post data. The data should be in the format
+ returned by urllib.urlencode."""
+ return None
+
+ def next(self):
+ """Return the next available tweet. This call is blocking!"""
+ while True:
+ try:
+ if not self.connected:
+ self._init_conn()
+
+ rate_time = time.time() - self._rate_ts
+ if not self._rate_ts or rate_time > self.rate_period:
+ self.rate = self._rate_cnt / rate_time
+ self._rate_cnt = 0
+ self._rate_ts = time.time()
+
+ data = self._conn.readline()
+ if data == "": # something is wrong
+ self.close()
+ raise ConnectionError("Got entry of length 0. Disconnected")
+ elif data.isspace():
+ continue
+
+ data = anyjson.deserialize(data)
+ self.count += 1
+ self._rate_cnt += 1
+ return data
+
+ except ValueError, e:
+ self.close()
+ raise ConnectionError("Got invalid data from twitter", details=data)
+
+ except socket.error, e:
+ self.close()
+ raise ConnectionError("Server disconnected")
+
+
+ def close(self):
+ """
+ Close the connection to the streaming server.
+ """
+ self.connected = False
+ if self._conn:
+ self._conn.close()
+
+
+class ReconnectingTweetStream(TweetStream):
+ """TweetStream class that automatically tries to reconnect if the
+ connecting goes down. Reconnecting, and waiting for reconnecting, is
+ blocking.
+
+ :param username: See :TweetStream:
+
+ :param password: See :TweetStream:
+
+ :keyword url: See :TweetStream:
+
+ :keyword reconnects: Number of reconnects before a ConnectionError is
+ raised. Default is 3
+
+ :error_cb: Optional callable that will be called just before trying to
+ reconnect. The callback will be called with a single argument, the
+ exception that caused the reconnect attempt. Default is None
+
+ :retry_wait: Time to wait before reconnecting in seconds. Default is 5
+
+ """
+
+ def __init__(self, username, password, url="sample",
+ reconnects=3, error_cb=None, retry_wait=5):
+ self.max_reconnects = reconnects
+ self.retry_wait = retry_wait
+ self._reconnects = 0
+ self._error_cb = error_cb
+ TweetStream.__init__(self, username, password, url=url)
+
+ def next(self):
+ while True:
+ try:
+ return TweetStream.next(self)
+ except ConnectionError, e:
+ self._reconnects += 1
+ if self._reconnects > self.max_reconnects:
+ raise ConnectionError("Too many retries")
+
+ # Note: error_cb is not called on the last error since we
+ # raise a ConnectionError instead
+ if callable(self._error_cb):
+ self._error_cb(e)
+
+ time.sleep(self.retry_wait)
+ # Don't listen to auth error, since we can't reasonably reconnect
+ # when we get one.
+
+class FollowStream(TweetStream):
+ """Stream class for getting tweets from followers.
+
+ :param user: See TweetStream
+
+ :param password: See TweetStream
+
+ :param followees: Iterable containing user IDs to follow.
+ ***Note:*** the user id in question is the numeric ID twitter uses,
+ not the normal username.
+
+ :keyword url: Like the url argument to TweetStream, except default
+ value is the "follow" endpoint.
+ """
+
+ def __init__(self, user, password, followees, url="follow", **kwargs):
+ self.followees = followees
+ TweetStream.__init__(self, user, password, url=url, **kwargs)
+
+ def _get_post_data(self):
+ return urllib.urlencode({"follow": ",".join(map(str, self.followees))})
+
+
+class TrackStream(TweetStream):
+ """Stream class for getting tweets relevant to keywords.
+
+ :param user: See TweetStream
+
+ :param password: See TweetStream
+
+ :param keywords: Iterable containing keywords to look for
+
+ :keyword url: Like the url argument to TweetStream, except default
+ value is the "track" endpoint.
+ """
+
+ def __init__(self, user, password, keywords, url="track", **kwargs):
+ self.keywords = keywords
+ TweetStream.__init__(self, user, password, url=url, **kwargs)
+
+ def _get_post_data(self):
+ return urllib.urlencode({"track": ",".join(self.keywords)})
Binary file script/virtualenv/res/anyjson.tar.gz has changed
Binary file script/virtualenv/res/tweetstream.tar.gz has changed