# HG changeset patch # User Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com> # Date 1329817626 -3600 # Node ID 99215db3da252a4f59f46b820560aebf8555107f # Parent 7fb5a7b0d35cbed0f5dd243dc4378b58562a76f7 correct tweetstream version and clean iri_tweet. can now do a setup diff -r 7fb5a7b0d35c -r 99215db3da25 .hgignore --- a/.hgignore Mon Feb 20 18:52:19 2012 +0100 +++ b/.hgignore Tue Feb 21 10:47:06 2012 +0100 @@ -46,4 +46,12 @@ syntax: regexp ^tweetcast/nodejs/node_modules$ syntax: regexp -^tweetcast/server-gevent/server_setup\.py$ \ No newline at end of file +^tweetcast/server-gevent/server_setup\.py$ +syntax: regexp +^script/lib/iri_tweet/dist$ +syntax: regexp +^script/lib/iri_tweet/iri_tweet\.egg-info$ +syntax: regexp +^script/lib/tweetstream/dist$ +syntax: regexp +^script/lib/tweetstream/tweetstream\.egg-info$ \ No newline at end of file diff -r 7fb5a7b0d35c -r 99215db3da25 script/lib/iri_tweet/LICENSE --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/iri_tweet/LICENSE Tue Feb 21 10:47:06 2012 +0100 @@ -0,0 +1,29 @@ +Copyright (c) 2012, IRI, Haussonne Yves-Marie +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + +Neither the name of Rune Halvorsen nor the names of its contributors may be +used to endorse or promote products derived from this software without +specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS +BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +CECILL-C \ No newline at end of file diff -r 7fb5a7b0d35c -r 99215db3da25 script/lib/iri_tweet/README --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/iri_tweet/README Tue Feb 21 10:47:06 2012 +0100 @@ -0,0 +1,33 @@ +.. -*- restructuredtext -*- + + +########################################## +iri_tweet - Simple twitter recording API +########################################## + +Introduction +------------ + + +Examples +-------- + +Contact +------- + +The author is Yves-Marie Haussonne . The project resides at +http://bitbucket.org/ . If you find bugs, or have feature +requests, please report them in the project site issue tracker. Patches are +also very welcome. + +Contributors +------------ + +- Yves-Marie Haussonne + +License +------- + +This software is licensed under the ``New BSD License`` and CECILL-C. See the ``LICENCE`` +file in the top distribution directory for the full license text. + diff -r 7fb5a7b0d35c -r 99215db3da25 script/lib/iri_tweet/__init__.py --- a/script/lib/iri_tweet/__init__.py Mon Feb 20 18:52:19 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,17 +0,0 @@ -VERSION = (0, 82, 0, "final", 0) - -VERSION_STR = unicode(".".join(map(lambda i:"%02d" % (i,), VERSION[:2]))) - - -def get_version(): - version = '%s.%s' % (VERSION[0], VERSION[1]) - if VERSION[2]: - version = '%s.%s' % (version, VERSION[2]) - if VERSION[3:] == ('alpha', 0): - version = '%s pre-alpha' % version - else: - if VERSION[3] != 'final': - version = '%s %s %s' % (version, VERSION[3], VERSION[4]) - return version - -__version__ = get_version() diff -r 7fb5a7b0d35c -r 99215db3da25 script/lib/iri_tweet/export_tweet_db.py --- a/script/lib/iri_tweet/export_tweet_db.py Mon Feb 20 18:52:19 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,47 +0,0 @@ -from models import setup_database -from optparse import OptionParser #@UnresolvedImport -from sqlalchemy.orm import sessionmaker -from utils import set_logging_options, set_logging, TwitterProcessor, logger -import sqlite3 #@UnresolvedImport - - -# 'entities': "tweet_entity", -# 'user': "tweet_user" - -def get_option(): - - parser = OptionParser() - - parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", - help="Token file name") - - set_logging_options(parser) - - return parser.parse_args() - -if __name__ == "__main__": - - (options, args) = get_option() - - set_logging(options) - - with sqlite3.connect(args[0]) as conn_in: - engine, metadata, Session = setup_database('sqlite:///'+args[1], echo=((options.verbose-options.quiet)>0)) - session = Session() - try: - curs_in = conn_in.cursor() - fields_mapping = {} - for i,res in enumerate(curs_in.execute("select json from tweet_tweet;")): - logger.debug("main loop %d : %s" % (i, res[0])) #@UndefinedVariable - processor = TwitterProcessor(eval(res[0]), res[0], None, session, options.token_filename) - processor.process() - session.commit() - logger.debug("main : %d tweet processed" % (i+1)) #@UndefinedVariable - except Exception, e: - session.rollback() - raise e - finally: - session.close() - - - \ No newline at end of file diff -r 7fb5a7b0d35c -r 99215db3da25 script/lib/iri_tweet/iri_tweet/__init__.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/iri_tweet/iri_tweet/__init__.py Tue Feb 21 10:47:06 2012 +0100 @@ -0,0 +1,23 @@ +"""Simple model end tools to record tweets""" + +VERSION = (0, 82, 0, "final", 0) + +VERSION_STR = unicode(".".join(map(lambda i:"%02d" % (i,), VERSION[:2]))) + + +def get_version(): + version = '%s.%s' % (VERSION[0], VERSION[1]) + if VERSION[2]: + version = '%s.%s' % (version, VERSION[2]) + if VERSION[3:] == ('alpha', 0): + version = '%s pre-alpha' % version + else: + if VERSION[3] != 'final': + version = '%s %s %s' % (version, VERSION[3], VERSION[4]) + return version + +__version__ = get_version() +__author__ = "Yves-Marie Haussonne" +__contact__ = "ymh.work@gmail.com" +__homepage__ = "" +__docformat__ = "restructuredtext" diff -r 7fb5a7b0d35c -r 99215db3da25 script/lib/iri_tweet/iri_tweet/models.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/iri_tweet/iri_tweet/models.py Tue Feb 21 10:47:06 2012 +0100 @@ -0,0 +1,290 @@ +from sqlalchemy import (Boolean, Column, Enum, BigInteger, Integer, String, + ForeignKey, DateTime, create_engine) +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relationship, sessionmaker +import anyjson +import datetime +import email.utils +import iri_tweet + + +Base = declarative_base() + +APPLICATION_NAME = "IRI_TWITTER" +CONSUMER_KEY = "54ThDZhpEjokcMgHJOMnQA" +CONSUMER_SECRET = "wUoL9UL2T87tfc97R0Dff2EaqRzpJ5XGdmaN2XK3udA" +ACCESS_TOKEN_KEY = None +ACCESS_TOKEN_SECRET = None +#ACCESS_TOKEN_KEY= "47312923-LiNTtz0I18YXMVIrFeTuhmH7bOvYsK6p3Ln2Dc" +#ACCESS_TOKEN_SECRET = "r3LoXVcjImNAElUpWqTu2SG2xCdWFHkva7xeQoncA" + +def adapt_date(date_str): + ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable + return datetime.datetime(*ts[0:7]) + +def adapt_json(obj): + if obj is None: + return None + else: + return anyjson.serialize(obj) + +class TweetMeta(type(Base)): + + def __init__(cls, name, bases, ns): #@NoSelf + def init(self, **kwargs): + for key, value in kwargs.items(): + if hasattr(self, key): + setattr(self, key, value) + super(cls, self).__init__() + setattr(cls, '__init__', init) + super(TweetMeta, cls).__init__(name, bases, ns) + + +class ProcessEvent(Base): + __metaclass__ = TweetMeta + __tablename__ = "tweet_process_event" + id = Column(Integer, primary_key=True, autoincrement=True) + ts = Column(DateTime, default=datetime.datetime.utcnow, index=True) + type = Column(Enum("start","pid","shutdown","error", "start_worker", "stop_worker", "model_version", "application_name", "application_version", name="process_event_type_enum"), nullable=False) + args = Column(String) + +class EntityType(Base): + __metaclass__ = TweetMeta + __tablename__ = "tweet_entity_type" + id = Column(Integer, primary_key=True, autoincrement=True) + label = Column(String) + +class Entity(Base): + __metaclass__ = TweetMeta + __tablename__ = "tweet_entity" + id = Column(Integer, primary_key=True) + tweet_id = Column(BigInteger, ForeignKey('tweet_tweet.id')) + type = Column(String) + entity_type_id = Column(Integer, ForeignKey('tweet_entity_type.id'), nullable=False) + entity_type = relationship("EntityType", backref="entities") + indice_start = Column(Integer) + indice_end = Column(Integer) + source = Column(String) + __mapper_args__ = {'polymorphic_on': type, 'polymorphic_identity': 'entity_entity', 'with_polymorphic':'*'} + + +class TweetSource(Base): + __metaclass__ = TweetMeta + __tablename__ = 'tweet_tweet_source' + id = Column(Integer, primary_key=True, autoincrement=True) + original_json = Column(String) + received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True) + + +class TweetLog(Base): + + TWEET_STATUS = { + 'OK' : 1, + 'ERROR' : 2, + 'NOT_TWEET': 3, + } + __metaclass__ = TweetMeta + + __tablename__ = 'tweet_tweet_log' + id = Column(Integer, primary_key=True, autoincrement=True) + ts = Column(DateTime, default=datetime.datetime.utcnow, index=True) + tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id')) + tweet_source = relationship("TweetSource", backref="logs") + status = Column(Integer) + error = Column(String) + error_stack = Column(String) + + +class Tweet(Base): + __metaclass__ = TweetMeta + __tablename__ = 'tweet_tweet' + + id = Column(BigInteger, primary_key=True, autoincrement=False) + id_str = Column(String) + contributors = Column(String) + coordinates = Column(String) + created_at = Column(DateTime, index=True) + favorited = Column(Boolean) + geo = Column(String) + in_reply_to_screen_name = Column(String) + in_reply_to_status_id = Column(BigInteger) + in_reply_to_status_id_str = Column(String) + in_reply_to_user_id = Column(BigInteger) + in_reply_to_user_id_str = Column(String) + place = Column(String) + retweet_count = Column(String) + retweeted = Column(Boolean) + source = Column(String) + text = Column(String) + truncated = Column(Boolean) + user_id = Column(Integer, ForeignKey('tweet_user.id')) + user = relationship("User", backref="tweets") + tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id')) + tweet_source = relationship("TweetSource", backref="tweet") + entity_list = relationship(Entity, backref='tweet') + received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True) + + +class UserMessage(Base): + __metaclass__ = TweetMeta + __tablename__ = "tweet_user_message" + + id = Column(Integer, primary_key=True) + user_id = Column(Integer, ForeignKey('tweet_user.id')) + user = relationship("User", backref="messages") + created_at = Column(DateTime, default=datetime.datetime.utcnow) + message_id = Column(Integer, ForeignKey('tweet_message.id')) + +class Message(Base): + __metaclass__ = TweetMeta + __tablename__ = "tweet_message" + + id = Column(Integer, primary_key=True) + created_at = Column(DateTime, default=datetime.datetime.utcnow) + text = Column(String) + users = relationship(UserMessage, backref='message') + + +class User(Base): + __metaclass__ = TweetMeta + __tablename__ = "tweet_user" + + id = Column(BigInteger, primary_key=True, autoincrement=False) + id_str = Column(String) + contributors_enabled = Column(Boolean) + created_at = Column(DateTime, index=True) + description = Column(String) + favourites_count = Column(Integer) + follow_request_sent = Column(Boolean) + followers_count = Column(Integer) + following = Column(String) + friends_count = Column(Integer) + geo_enabled = Column(Boolean) + is_translator = Column(Boolean) + lang = Column(String) + listed_count = Column(Integer) + location = Column(String) + name = Column(String) + notifications = Column(String) + profile_background_color = Column(String) + profile_background_image_url = Column(String) + profile_background_tile = Column(Boolean) + profile_image_url = Column(String) + profile_image_url_https = Column(String) + profile_link_color = Column(String) + profile_sidebar_border_color = Column(String) + profile_sidebar_fill_color = Column(String) + profile_text_color = Column(String) + default_profile_image = Column(String) + profile_use_background_image = Column(Boolean) + protected = Column(Boolean) + screen_name = Column(String, index=True) + show_all_inline_media = Column(Boolean) + statuses_count = Column(Integer) + time_zone = Column(String) + url = Column(String) + utc_offset = Column(Integer) + verified = Column(Boolean) + + +class Hashtag(Base): + __metaclass__ = TweetMeta + __tablename__ = "tweet_hashtag" + id = Column(Integer, primary_key=True) + text = Column(String, unique=True, index=True) + + +class Url(Base): + __metaclass__ = TweetMeta + __tablename__ = "tweet_url" + id = Column(Integer, primary_key=True) + url = Column(String, unique=True) + expanded_url = Column(String) + + +class MediaType(Base): + __metaclass__ = TweetMeta + __tablename__ = "tweet_media_type" + id = Column(Integer, primary_key=True, autoincrement=True) + label = Column(String, unique=True, index=True) + + + +class Media(Base): + __metaclass__ = TweetMeta + __tablename__ = "tweet_media" + id = Column(BigInteger, primary_key=True, autoincrement=False) + id_str = Column(String, unique=True) + media_url = Column(String, unique=True) + media_url_https = Column(String, unique=True) + url = Column(String) + display_url = Column(String) + expanded_url = Column(String) + sizes = Column(String) + type_id = Column(Integer, ForeignKey("tweet_media_type.id")) + type = relationship(MediaType, primaryjoin=type_id == MediaType.id) + + + +class EntityHashtag(Entity): + __tablename__ = "tweet_entity_hashtag" + __mapper_args__ = {'polymorphic_identity': 'entity_hashtag'} + id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True) + hashtag_id = Column(Integer, ForeignKey("tweet_hashtag.id")) + hashtag = relationship(Hashtag, primaryjoin=hashtag_id == Hashtag.id) + + +class EntityUrl(Entity): + __tablename__ = "tweet_entity_url" + __mapper_args__ = {'polymorphic_identity': 'entity_url'} + id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True) + url_id = Column(Integer, ForeignKey("tweet_url.id")) + url = relationship(Url, primaryjoin=url_id == Url.id) + +class EntityUser(Entity): + __tablename__ = "tweet_entity_user" + __mapper_args__ = {'polymorphic_identity': 'entity_user'} + id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True) + user_id = Column(BigInteger, ForeignKey('tweet_user.id')) + user = relationship(User, primaryjoin=(user_id == User.id)) + + +class EntityMedia(Entity): + __tablename__ = "tweet_entity_media" + __mapper_args__ = {'polymorphic_identity': 'entity_media'} + id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True) + media_id = Column(BigInteger, ForeignKey('tweet_media.id')) + media = relationship(Media, primaryjoin=(media_id == Media.id)) + +def add_model_version(session, must_commit=True): + pe = ProcessEvent(args=iri_tweet.get_version(), type="model_version") + session.add(pe) + if must_commit: + session.commit() + +def setup_database(*args, **kwargs): + + session_argname = [ 'autoflush','binds', "class_", "_enable_transaction_accounting","expire_on_commit", "extension", "query_cls", "twophase", "weak_identity_map", "autocommit"] + + kwargs_ce = dict((k, v) for k,v in kwargs.items() if (k not in session_argname and k != "create_all")) + + engine = create_engine(*args, **kwargs_ce) + metadata = Base.metadata + + kwargs_sm = {'bind': engine} + + kwargs_sm.update([(argname, kwargs[argname]) for argname in session_argname if argname in kwargs]) + + Session = sessionmaker(**kwargs_sm) + #set model version + + if kwargs.get('create_all', True): + metadata.create_all(engine) + session = Session() + try: + add_model_version(session) + finally: + session.close() + + return (engine, metadata, Session) + diff -r 7fb5a7b0d35c -r 99215db3da25 script/lib/iri_tweet/iri_tweet/tests.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/iri_tweet/iri_tweet/tests.py Tue Feb 21 10:47:06 2012 +0100 @@ -0,0 +1,197 @@ +from sqlalchemy import Column, Integer, String, ForeignKey, create_engine +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relationship, backref +import unittest #@UnresolvedImport +from sqlalchemy.orm import sessionmaker +from iri_tweet.utils import ObjectsBuffer, TwitterProcessor +from iri_tweet import models +import tempfile #@UnresolvedImport +import os + +Base = declarative_base() + +class User(Base): + __tablename__ = 'users' + + id = Column(Integer, primary_key=True) + name = Column(String) + fullname = Column(String) + password = Column(String) + + def __init__(self, name, fullname, password): + self.name = name + self.fullname = fullname + self.password = password + + def __repr__(self): + return "" % (self.name, self.fullname, self.password) + + +class Address(Base): + __tablename__ = 'addresses' + id = Column(Integer, primary_key=True) + email_address = Column(String, nullable=False) + user_id = Column(Integer, ForeignKey('users.id')) + + user = relationship("User", backref=backref('addresses', order_by=id)) + + def __init__(self, user_id, email_address): + self.email_address = email_address + self.user_id = user_id + + def __repr__(self): + return "" % self.email_address + + + +class TestObjectBuffer(unittest.TestCase): + + def setUp(self): + self.engine = create_engine('sqlite:///:memory:', echo=False) + Base.metadata.create_all(self.engine) + sessionMaker = sessionmaker(bind=self.engine) + self.session = sessionMaker() + + def tearDown(self): + self.session.close() + self.engine.dispose() + + + def testCreateUser(self): + ed_user = User('ed', 'Ed Jones', 'edspassword') + self.session.add(ed_user) + self.assertTrue(ed_user.id is None) + self.session.commit() + self.assertTrue(ed_user.id is not None) + + + def testSimpleBuffer(self): + obj_buffer = ObjectsBuffer() + obj_proxy = obj_buffer.add_object(User, ['ed1', 'Ed1 Jones', 'edspassword'], None, False) + self.assertTrue(obj_proxy.id() is None) + obj_buffer.persists(self.session) + self.assertTrue(obj_proxy.id() is None) + self.session.commit() + self.assertTrue(obj_proxy.id() is not None) + + + def testSimpleBufferKwargs(self): + obj_buffer = ObjectsBuffer() + obj_proxy = obj_buffer.add_object(User, None, {'name':'ed1b', 'fullname':'Ed1b Jones', 'password':'edspassword'}, False) + self.assertTrue(obj_proxy.id() is None) + obj_buffer.persists(self.session) + self.assertTrue(obj_proxy.id() is None) + self.session.commit() + self.assertTrue(obj_proxy.id() is not None) + + + def testSimpleBufferFlush(self): + obj_buffer = ObjectsBuffer() + obj_proxy = obj_buffer.add_object(User, ['ed2', 'Ed2 Jones', 'edspassword'], None, True) + self.assertTrue(obj_proxy.id() is None) + obj_buffer.persists(self.session) + self.assertTrue(obj_proxy.id() is not None) + self.session.commit() + self.assertTrue(obj_proxy.id() is not None) + + def testRelationBuffer(self): + obj_buffer = ObjectsBuffer() + user1_proxy = obj_buffer.add_object(User, ['ed3', 'Ed3 Jones', 'edspassword'], None, True) + obj_buffer.add_object(Address, [user1_proxy.id,'ed3@mail.com'], None, False) + obj_buffer.add_object(Address, [user1_proxy.id,'ed3@other.com'], None, False) + user2_proxy = obj_buffer.add_object(User, ['ed4', 'Ed3 Jones', 'edspassword'], None, True) + obj_buffer.add_object(Address, [user2_proxy.id,'ed4@mail.com'], None, False) + obj_buffer.persists(self.session) + self.session.commit() + ed_user = self.session.query(User).filter_by(name='ed3').first() + self.assertEquals(2, len(ed_user.addresses)) + ed_user = self.session.query(User).filter_by(name='ed4').first() + self.assertEquals(1, len(ed_user.addresses)) + + + def testGet(self): + obj_buffer = ObjectsBuffer() + user1_proxy = obj_buffer.add_object(User, None, {'name':'ed2', 'fullname':'Ed2 Jones', 'password':'edspassword'}, True) + adress_proxy = obj_buffer.add_object(Address, None, {'user_id':user1_proxy.id,'email_address':'ed2@other.com'}, False) + user2_proxy = obj_buffer.add_object(User, None, {'name':'ed3', 'fullname':'Ed3 Jones', 'password':'edspassword'}, True) + obj_buffer.add_object(Address, None, {'user_id':user2_proxy.id,'email_address':'ed3@other.com'}, False) + self.assertEquals(user1_proxy, obj_buffer.get(User, name='ed2')) + self.assertEquals(adress_proxy, obj_buffer.get(Address, email_address='ed2@other.com')) + self.assertEquals(user2_proxy, obj_buffer.get(User, name='ed3')) + self.assertTrue(obj_buffer.get(User, name='ed3', fullname='Ed2 Jones') is None) + +original_json = u'{"in_reply_to_user_id_str":null,"text":"RT @BieberEagle: \\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweeted_status":{"in_reply_to_user_id_str":null,"text":"\\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweet_count":"100+","source":"web","entities":{"user_mentions":[],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96638597737889792","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/298443445\/355584171.jpg","listed_count":5040,"friends_count":8477,"profile_link_color":"ff0000","profile_sidebar_border_color":"000000","url":"http:\/\/twitpic.com\/photos\/BieberEagle","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","profile_image_url":"http:\/\/a2.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","description":"1 name, 1 inspiration, 1 hero, 1 smile, 1 singer, 1 boy who changed my life. B.\u0130.E.B.E.R-Believe In Everything Because Everything\'s Reachable. #NEVERSAYNEVER","default_profile":false,"notifications":null,"time_zone":"Paris","followers_count":14506,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"BieberEagle","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ffffff","location":"\u2665 Albania \u2665 ","id_str":"229067923","profile_background_image_url":"http:\/\/a2.twimg.com\/profile_background_images\/298443445\/355584171.jpg","favourites_count":89,"protected":false,"follow_request_sent":null,"following":null,"name":"truebelieber","statuses_count":24279,"verified":false,"created_at":"Tue Dec 21 12:35:18 +0000 2010","profile_text_color":"000000","id":229067923,"contributors_enabled":false,"utc_offset":3600,"profile_sidebar_fill_color":""},"id":96638597737889792,"created_at":"Thu Jul 28 17:50:11 +0000 2011","geo":null,"in_reply_to_screen_name":null},"retweet_count":"100+","source":"web","entities":{"user_mentions":[{"indices":[3,15],"screen_name":"BieberEagle","id_str":"229067923","name":"truebelieber","id":229067923}],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96965037637382145","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","listed_count":3,"friends_count":1150,"profile_link_color":"00cccc","profile_sidebar_border_color":"c8ff00","url":"http:\/\/www.facebook.com\/blovedbecca180","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","profile_image_url":"http:\/\/a3.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","description":"if ya wanna know something about me, then get to know me. \\n\\r\\n\\ri promise, you wont regret it. (:\\r\\ni love justin bieber with an extreme burning passion!","default_profile":false,"notifications":null,"time_zone":"Central Time (US & Canada)","followers_count":361,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"beccaxannxx","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ff0066","location":"","id_str":"65624607","profile_background_image_url":"http:\/\/a3.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","favourites_count":266,"protected":false,"follow_request_sent":null,"following":null,"name":"beccaxannxx","statuses_count":2512,"verified":false,"created_at":"Fri Aug 14 12:36:35 +0000 2009","profile_text_color":"6a39d4","id":65624607,"contributors_enabled":false,"utc_offset":-21600,"profile_sidebar_fill_color":"ff00bb"},"id":96965037637382145,"created_at":"Fri Jul 29 15:27:21 +0000 2011","geo":null,"in_reply_to_screen_name":null}' +original_json_media = u'{"user": {"follow_request_sent": null, "profile_use_background_image": true, "id": 34311537, "verified": false, "profile_image_url_https": "https://si0.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "profile_sidebar_fill_color": "DAECF4", "is_translator": false, "geo_enabled": false, "profile_text_color": "663B12", "followers_count": 29, "profile_sidebar_border_color": "C6E2EE", "id_str": "34311537", "default_profile_image": false, "location": "", "utc_offset": -25200, "statuses_count": 813, "description": "i like joe jonas, justin bieber, ashley tisdale, selena gomez, megen fox, kim kardashian and demi lovoto and many more.", "friends_count": 101, "profile_link_color": "1F98C7", "profile_image_url": "http://a1.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "notifications": null, "show_all_inline_media": false, "profile_background_image_url_https": "https://si0.twimg.com/profile_background_images/298244445/tour2011.jpg", "profile_background_color": "C6E2EE", "profile_background_image_url": "http://a1.twimg.com/profile_background_images/298244445/tour2011.jpg", "name": "mikayla", "lang": "en", "profile_background_tile": true, "favourites_count": 231, "screen_name": "bieberfever17ya", "url": null, "created_at": "Wed Apr 22 16:04:28 +0000 2009", "contributors_enabled": false, "time_zone": "Mountain Time (US & Canada)", "protected": false, "default_profile": false, "following": null, "listed_count": 1}, "favorited": false, "entities": {"user_mentions": [], "media": [{"media_url_https": "https://p.twimg.com/AWea5Z-CQAAvfvK.jpg", "expanded_url": "http://twitter.com/bieberfever17ya/status/101219827649232896/photo/1", "sizes": {"small": {"h": 240, "w": 201, "resize": "fit"}, "large": {"h": 240, "w": 201, "resize": "fit"}, "medium": {"h": 240, "w": 201, "resize": "fit"}, "thumb": {"h": 150, "w": 150, "resize": "crop"}}, "url": "http://t.co/N7yZ8hS", "display_url": "pic.twitter.com/N7yZ8hS", "id_str": "101219827653427200", "indices": [31, 50], "type": "photo", "id": 101219827653427200, "media_url": "http://p.twimg.com/AWea5Z-CQAAvfvK.jpg"}], "hashtags": [], "urls": []}, "contributors": null, "truncated": false, "text": "i love you justin bieber <3 http://t.co/N7yZ8hS", "created_at": "Wed Aug 10 09:14:22 +0000 2011", "retweeted": false, "in_reply_to_status_id": null, "coordinates": null, "id": 101219827649232896, "source": "web", "in_reply_to_status_id_str": null, "place": null, "in_reply_to_user_id": null, "in_reply_to_screen_name": null, "retweet_count": 0, "geo": null, "in_reply_to_user_id_str": null, "possibly_sensitive": false, "id_str": "101219827649232896"}' +original_json_media_others = u'{"user": {"utc_offset": -25200, "statuses_count": 813, "default_profile_image": false, "friends_count": 101, "profile_background_image_url_https": "https://si0.twimg.com/profile_background_images/298244445/tour2011.jpg", "profile_use_background_image": true, "profile_sidebar_fill_color": "DAECF4", "profile_link_color": "1F98C7", "profile_image_url": "http://a1.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "time_zone": "Mountain Time (US & Canada)", "is_translator": false, "screen_name": "bieberfever17ya", "url": null, "show_all_inline_media": false, "geo_enabled": false, "profile_background_color": "C6E2EE", "id": 34311537, "profile_background_image_url": "http://a1.twimg.com/profile_background_images/298244445/tour2011.jpg", "description": "i like joe jonas, justin bieber, ashley tisdale, selena gomez, megen fox, kim kardashian and demi lovoto and many more.", "lang": "en", "profile_background_tile": true, "favourites_count": 231, "name": "mikayla", "notifications": null, "follow_request_sent": null, "created_at": "Wed Apr 22 16:04:28 +0000 2009", "verified": false, "contributors_enabled": false, "location": "", "profile_text_color": "663B12", "followers_count": 29, "profile_sidebar_border_color": "C6E2EE", "id_str": "34311537", "default_profile": false, "following": null, "protected": false, "profile_image_url_https": "https://si0.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "listed_count": 1}, "favorited": false, "contributors": null, "source": "web", "text": "i love you justin bieber <3 http://t.co/N7yZ8hS", "created_at": "Wed Aug 10 09:14:22 +0000 2011", "truncated": false, "retweeted": false, "in_reply_to_status_id_str": null, "coordinates": null, "in_reply_to_user_id_str": null, "entities": {"user_mentions": [], "media": [], "hashtags": [], "urls": [], "others": [{"url": "http://t.co/N7yZ8hS", "text": "comments", "indices": [31, 50]}]}, "in_reply_to_status_id": null, "in_reply_to_screen_name": null, "id_str": "101219827649232896", "place": null, "retweet_count": 0, "geo": null, "id": 101219827649232896, "possibly_sensitive": false, "in_reply_to_user_id": null}' + +class TestTwitterProcessor(unittest.TestCase): + + def setUp(self): + self.engine, self.metadata, sessionMaker = models.setup_database('sqlite:///:memory:', echo=True) + self.session = sessionMaker() + file, self.tmpfilepath = tempfile.mkstemp() + os.close(file) + + + def testTwitterProcessor(self): + tp = TwitterProcessor(None, original_json, None, self.session, self.tmpfilepath) + tp.process() + self.session.commit() + + self.assertEquals(1, self.session.query(models.TweetSource).count()) + self.assertEquals(1, self.session.query(models.Tweet).count()) + self.assertEquals(2, self.session.query(models.User).count()) + tweet = self.session.query(models.Tweet).first() + self.assertFalse(tweet.user is None) + self.assertEqual(u"beccaxannxx",tweet.user.name) + self.assertEqual(65624607,tweet.user.id) + self.assertEqual(1,len(tweet.entity_list)) + entity = tweet.entity_list[0] + self.assertEqual(u"BieberEagle", entity.user.screen_name) + self.assertTrue(entity.user.created_at is None) + self.assertEqual("entity_user", entity.type) + self.assertEqual("user_mentions", entity.entity_type.label) + + + def testTwitterProcessorMedia(self): + tp = TwitterProcessor(None, original_json_media, None, self.session, self.tmpfilepath) + tp.process() + self.session.commit() + + self.assertEquals(1, self.session.query(models.TweetSource).count()) + self.assertEquals(1, self.session.query(models.Tweet).count()) + self.assertEquals(1, self.session.query(models.User).count()) + tweet = self.session.query(models.Tweet).first() + self.assertFalse(tweet.user is None) + self.assertEqual(u"mikayla",tweet.user.name) + self.assertEqual(34311537,tweet.user.id) + self.assertEqual(1,len(tweet.entity_list)) + entity = tweet.entity_list[0] + self.assertEqual(101219827653427200, entity.media.id) + self.assertEqual("photo", entity.media.type.label) + self.assertEqual("entity_media", entity.type) + self.assertEqual("media", entity.entity_type.label) + + + def testTwitterProcessorMediaOthers(self): + tp = TwitterProcessor(None, original_json_media_others, None, self.session, self.tmpfilepath) + tp.process() + self.session.commit() + + self.assertEquals(1, self.session.query(models.TweetSource).count()) + self.assertEquals(1, self.session.query(models.Tweet).count()) + tweet = self.session.query(models.Tweet).first() + self.assertEqual(1,len(tweet.entity_list)) + entity = tweet.entity_list[0] + self.assertEqual("entity_entity", entity.type) + self.assertEqual("others", entity.entity_type.label) + + + + def tearDown(self): + self.session.close() + self.engine.dispose() + os.remove(self.tmpfilepath) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff -r 7fb5a7b0d35c -r 99215db3da25 script/lib/iri_tweet/iri_tweet/utils.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/iri_tweet/iri_tweet/utils.py Tue Feb 21 10:47:06 2012 +0100 @@ -0,0 +1,626 @@ +from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, + EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, + ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType, + Media, EntityMedia, Entity, EntityType) +from sqlalchemy.sql import select, or_ #@UnresolvedImport +import Queue #@UnresolvedImport +import anyjson #@UnresolvedImport +import codecs +import datetime +import email.utils +import logging +import math +import os.path +import sys +import twitter.oauth #@UnresolvedImport +import twitter.oauth_dance #@UnresolvedImport +import twitter_text #@UnresolvedImport + + +CACHE_ACCESS_TOKEN = {} + +def get_oauth_token(token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET): + + global CACHE_ACCESS_TOKEN + + if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET: + return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET + + res = CACHE_ACCESS_TOKEN.get(application_name, None) + + if res is None and token_file_path and os.path.exists(token_file_path): + get_logger().debug("get_oauth_token : reading token from file %s" % token_file_path) #@UndefinedVariable + res = twitter.oauth.read_token_file(token_file_path) + + if res is not None and check_access_token: + get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable + t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], CONSUMER_KEY, CONSUMER_SECRET)) + status = None + try: + status = t.account.rate_limit_status() + except Exception as e: + get_logger().debug("get_oauth_token : error getting rate limit status %s" % repr(e)) + status = None + get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable + if status is None or status['remaining_hits'] == 0: + get_logger().debug("get_oauth_token : Problem with status %s" % repr(status)) + res = None + + if res is None: + get_logger().debug("get_oauth_token : doing the oauth dance") + res = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path) + + CACHE_ACCESS_TOKEN[application_name] = res + + return res + +def parse_date(date_str): + ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable + return datetime.datetime(*ts[0:7]) + +def clean_keys(dict_val): + return dict([(str(key),value) for key,value in dict_val.items()]) + +fields_adapter = { + 'stream': { + "tweet": { + "created_at" : adapt_date, + "coordinates" : adapt_json, + "place" : adapt_json, + "geo" : adapt_json, +# "original_json" : adapt_json, + }, + "user": { + "created_at" : adapt_date, + }, + + }, + + 'entities' : { + "medias": { + "sizes" : adapt_json, + }, + }, + 'rest': { + "tweet" : { + "place" : adapt_json, + "geo" : adapt_json, + "created_at" : adapt_date, +# "original_json" : adapt_json, + }, + }, +} + +# +# adapt fields, return a copy of the field_dict with adapted fields +# +def adapt_fields(fields_dict, adapter_mapping): + def adapt_one_field(field, value): + if field in adapter_mapping and adapter_mapping[field] is not None: + return adapter_mapping[field](value) + else: + return value + return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) + + +class ObjectBufferProxy(object): + def __init__(self, klass, args, kwargs, must_flush, instance=None): + self.klass= klass + self.args = args + self.kwargs = kwargs + self.must_flush = must_flush + self.instance = instance + + def persists(self, session): + new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else [] + new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {} + + if self.instance is None: + self.instance = self.klass(*new_args, **new_kwargs) + else: + self.instance = self.klass(*new_args, **new_kwargs) + self.instance = session.merge(self.instance) + + session.add(self.instance) + if self.must_flush: + session.flush() + + def __getattr__(self, name): + return lambda : getattr(self.instance, name) if self.instance else None + + + + +class ObjectsBuffer(object): + + def __init__(self): + self.__bufferlist = [] + self.__bufferdict = {} + + def __add_proxy_object(self, proxy): + proxy_list = self.__bufferdict.get(proxy.klass, None) + if proxy_list is None: + proxy_list = [] + self.__bufferdict[proxy.klass] = proxy_list + proxy_list.append(proxy) + self.__bufferlist.append(proxy) + + def persists(self, session): + for object_proxy in self.__bufferlist: + object_proxy.persists(session) + + def add_object(self, klass, args, kwargs, must_flush, instance=None): + new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush, instance) + self.__add_proxy_object(new_proxy) + return new_proxy + + def get(self, klass, **kwargs): + if klass in self.__bufferdict: + for proxy in self.__bufferdict[klass]: + if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass: + continue + found = True + for k,v in kwargs.items(): + if (k not in proxy.kwargs) or v != proxy.kwargs[k]: + found = False + break + if found: + return proxy + return None + +class TwitterProcessorException(Exception): + pass + +class TwitterProcessor(object): + + def __init__(self, json_dict, json_txt, source_id, session, access_token=None, token_filename=None, user_query_twitter=False): + + if json_dict is None and json_txt is None: + raise TwitterProcessorException("No json") + + if json_dict is None: + self.json_dict = anyjson.deserialize(json_txt) + else: + self.json_dict = json_dict + + if not json_txt: + self.json_txt = anyjson.serialize(json_dict) + else: + self.json_txt = json_txt + + if "id" not in self.json_dict: + raise TwitterProcessorException("No id in json") + + self.source_id = source_id + self.session = session + self.token_filename = token_filename + self.access_token = access_token + self.obj_buffer = ObjectsBuffer() + self.user_query_twitter = user_query_twitter + + + + def __get_user(self, user_dict, do_merge): + get_logger().debug("Get user : " + repr(user_dict)) #@UndefinedVariable + + user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) + + user_id = user_dict.get("id",None) + user_name = user_dict.get("screen_name", user_dict.get("name", None)) + + if user_id is None and user_name is None: + return None + + user = None + if user_id: + user = self.obj_buffer.get(User, id=user_id) + else: + user = self.obj_buffer.get(User, screen_name=user_name) + + #to do update user id needed + if user is not None: + user_created_at = None + if user.args is not None: + user_created_at = user.args.get('created_at', None) + if user_created_at is None and user_dict.get('created_at', None) is not None and do_merge: + if user.args is None: + user.args = user_dict + else: + user.args.update(user_dict) + return user + + #todo : add methpds to objectbuffer to get buffer user + user_obj = None + if user_id: + user_obj = self.session.query(User).filter(User.id == user_id).first() + else: + user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first() + + #todo update user if needed + if user_obj is not None: + if user_obj.created_at is not None or user_dict.get('created_at', None) is None or not do_merge : + user = ObjectBufferProxy(User, None, None, False, user_obj) + else: + user = self.obj_buffer.add_object(User, None, user_dict, True, user_obj) + return user + + user_created_at = user_dict.get("created_at", None) + + if user_created_at is None and self.user_query_twitter: + + if self.access_token is not None: + acess_token_key, access_token_secret = self.access_token + else: + acess_token_key, access_token_secret = get_oauth_token(self.token_filename) + t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET)) + try: + if user_id: + user_dict = t.users.show(user_id=user_id) + else: + user_dict = t.users.show(screen_name=user_name) + except Exception as e: + get_logger().info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable + get_logger().info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable + return None + + if "id" not in user_dict: + return None + + #TODO filter get, wrap in proxy + user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first() + + if user_obj is not None and not do_merge: + return ObjectBufferProxy(User, None, None, False, user_obj) + else: + return self.obj_buffer.add_object(User, None, user_dict, True) + + def __get_or_create_object(self, klass, filter_by_kwargs, filter, creation_kwargs, must_flush, do_merge): + + obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs) + if obj_proxy is None: + query = self.session.query(klass) + if filter is not None: + query = query.filter(filter) + else: + query = query.filter_by(**filter_by_kwargs) + obj_instance = query.first() + if obj_instance is not None: + if not do_merge: + obj_proxy = ObjectBufferProxy(klass, None, None, False, obj_instance) + else: + obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush, obj_instance) + if obj_proxy is None: + obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush) + return obj_proxy + + + def __process_entity(self, ind, ind_type): + get_logger().debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable + + ind = clean_keys(ind) + + entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False) + + entity_dict = { + "indice_start" : ind["indices"][0], + "indice_end" : ind["indices"][1], + "tweet_id" : self.tweet.id, + "entity_type_id" : entity_type.id, + "source" : adapt_json(ind) + } + + def process_medias(): + + media_id = ind.get('id', None) + if media_id is None: + return None, None + + type_str = ind.get("type", "photo") + media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False) + media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"]) + if "type" in media_ind: + del(media_ind["type"]) + media_ind['type_id'] = media_type.id + media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False) + + entity_dict['media_id'] = media.id + return EntityMedia, entity_dict + + def process_hashtags(): + text = ind.get("text", ind.get("hashtag", None)) + if text is None: + return None, None + ind['text'] = text + hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False) + entity_dict['hashtag_id'] = hashtag.id + return EntityHashtag, entity_dict + + def process_user_mentions(): + user_mention = self.__get_user(ind, False) + if user_mention is None: + entity_dict['user_id'] = None + else: + entity_dict['user_id'] = user_mention.id + return EntityUser, entity_dict + + def process_urls(): + url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False) + entity_dict['url_id'] = url.id + return EntityUrl, entity_dict + + #{'': lambda } + entity_klass, entity_dict = { + 'hashtags': process_hashtags, + 'user_mentions' : process_user_mentions, + 'urls' : process_urls, + 'media': process_medias, + }.get(ind_type, lambda: (Entity, entity_dict))() + + get_logger().debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable + if entity_klass: + self.obj_buffer.add_object(entity_klass, None, entity_dict, False) + + + def __process_twitter_stream(self): + + tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() + if tweet_nb > 0: + return + + ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"]) + + # get or create user + user = self.__get_user(self.json_dict["user"], True) + if user is None: + get_logger().warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable + ts_copy["user_id"] = None + else: + ts_copy["user_id"] = user.id + + del(ts_copy['user']) + ts_copy["tweet_source_id"] = self.source_id + + self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True) + + self.__process_entities() + + + def __process_entities(self): + if "entities" in self.json_dict: + for ind_type, entity_list in self.json_dict["entities"].items(): + for ind in entity_list: + self.__process_entity(ind, ind_type) + else: + + text = self.tweet.text + extractor = twitter_text.Extractor(text) + for ind in extractor.extract_hashtags_with_indices(): + self.__process_entity(ind, "hashtags") + + for ind in extractor.extract_urls_with_indices(): + self.__process_entity(ind, "urls") + + for ind in extractor.extract_mentioned_screen_names_with_indices(): + self.__process_entity(ind, "user_mentions") + + def __process_twitter_rest(self): + tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() + if tweet_nb > 0: + return + + + tweet_fields = { + 'created_at': self.json_dict["created_at"], + 'favorited': False, + 'id': self.json_dict["id"], + 'id_str': self.json_dict["id_str"], + #'in_reply_to_screen_name': ts["to_user"], + 'in_reply_to_user_id': self.json_dict["to_user_id"], + 'in_reply_to_user_id_str': self.json_dict["to_user_id_str"], + #'place': ts["place"], + 'source': self.json_dict["source"], + 'text': self.json_dict["text"], + 'truncated': False, + 'tweet_source_id' : self.source_id, + } + + #user + + user_fields = { + 'lang' : self.json_dict.get('iso_language_code',None), + 'profile_image_url' : self.json_dict["profile_image_url"], + 'screen_name' : self.json_dict["from_user"], + 'id' : self.json_dict["from_user_id"], + 'id_str' : self.json_dict["from_user_id_str"], + 'name' : self.json_dict['from_user_name'], + } + + user = self.__get_user(user_fields, do_merge=False) + if user is None: + get_logger().warning("USER not found " + repr(user_fields)) #@UndefinedVariable + tweet_fields["user_id"] = None + else: + tweet_fields["user_id"] = user.id + + tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) + self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True) + + self.__process_entities() + + + + def process(self): + + if self.source_id is None: + tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True) + self.source_id = tweet_source.id + + if "metadata" in self.json_dict: + self.__process_twitter_rest() + else: + self.__process_twitter_stream() + + self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, True) + + self.obj_buffer.persists(self.session) + + +def set_logging(options, plogger=None, queue=None): + + logging_config = { + "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s', + "level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable + } + + if options.logfile == "stdout": + logging_config["stream"] = sys.stdout + elif options.logfile == "stderr": + logging_config["stream"] = sys.stderr + else: + logging_config["filename"] = options.logfile + + logger = plogger + if logger is None: + logger = get_logger() #@UndefinedVariable + + if len(logger.handlers) == 0: + filename = logging_config.get("filename") + if queue is not None: + hdlr = QueueHandler(queue, True) + elif filename: + mode = logging_config.get("filemode", 'a') + hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable + else: + stream = logging_config.get("stream") + hdlr = logging.StreamHandler(stream) #@UndefinedVariable + + fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable + dfs = logging_config.get("datefmt", None) + fmt = logging.Formatter(fs, dfs) #@UndefinedVariable + hdlr.setFormatter(fmt) + logger.addHandler(hdlr) + level = logging_config.get("level") + if level is not None: + logger.setLevel(level) + + options.debug = (options.verbose-options.quiet > 0) + return logger + +def set_logging_options(parser): + parser.add_option("-l", "--log", dest="logfile", + help="log to file", metavar="LOG", default="stderr") + parser.add_option("-v", dest="verbose", action="count", + help="verbose", metavar="VERBOSE", default=0) + parser.add_option("-q", dest="quiet", action="count", + help="quiet", metavar="QUIET", default=0) + +def get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist): + + query = query.join(EntityHashtag).join(Hashtag) + + if tweet_exclude_table is not None: + query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable + + if start_date: + query = query.filter(Tweet.created_at >= start_date) + if end_date: + query = query.filter(Tweet.created_at <= end_date) + + if user_whitelist: + query = query.join(User).filter(User.screen_name.in_(user_whitelist)) + + + if hashtags : + def merge_hash(l,h): + l.extend(h.split(",")) + return l + htags = reduce(merge_hash, hashtags, []) + + query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable + + return query + + + +def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist): + + query = session.query(Tweet) + query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist) + return query.order_by(Tweet.created_at) + + +def get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table): + + query = session.query(User).join(Tweet) + + query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, None) + + return query.distinct() + +logger_name = "iri.tweet" + +def get_logger(): + global logger_name + return logging.getLogger(logger_name) #@UndefinedVariable + + +# Next two import lines for this demo only + +class QueueHandler(logging.Handler): #@UndefinedVariable + """ + This is a logging handler which sends events to a multiprocessing queue. + """ + + def __init__(self, queue, ignore_full): + """ + Initialise an instance, using the passed queue. + """ + logging.Handler.__init__(self) #@UndefinedVariable + self.queue = queue + self.ignore_full = True + + def emit(self, record): + """ + Emit a record. + + Writes the LogRecord to the queue. + """ + try: + ei = record.exc_info + if ei: + dummy = self.format(record) # just to get traceback text into record.exc_text + record.exc_info = None # not needed any more + if not self.ignore_full or not self.queue.full(): + self.queue.put_nowait(record) + except Queue.Full: + if self.ignore_full: + pass + else: + raise + except (KeyboardInterrupt, SystemExit): + raise + except: + self.handleError(record) + +def show_progress(current_line, total_line, label, width, writer=None): + + if writer is None: + writer = sys.stdout + if sys.stdout.encoding is not None: + writer = codecs.getwriter(sys.stdout.encoding)(sys.stdout) + + percent = (float(current_line) / float(total_line)) * 100.0 + + marks = math.floor(width * (percent / 100.0)) + spaces = math.floor(width - marks) + + loader = u'[' + (u'=' * int(marks)) + (u' ' * int(spaces)) + u']' + + s = u"%s %3d%% %*d/%d - %*s\r" % (loader, percent, len(str(total_line)), current_line, total_line, width, label[:width]) + + writer.write(s) #takes the header into account + if percent >= 100: + writer.write("\n") + writer.flush() + + return writer diff -r 7fb5a7b0d35c -r 99215db3da25 script/lib/iri_tweet/models.py --- a/script/lib/iri_tweet/models.py Mon Feb 20 18:52:19 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,290 +0,0 @@ -from sqlalchemy import (Boolean, Column, Enum, BigInteger, Integer, String, - ForeignKey, DateTime, create_engine) -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import relationship, sessionmaker -import anyjson -import datetime -import email.utils -import iri_tweet - - -Base = declarative_base() - -APPLICATION_NAME = "IRI_TWITTER" -CONSUMER_KEY = "54ThDZhpEjokcMgHJOMnQA" -CONSUMER_SECRET = "wUoL9UL2T87tfc97R0Dff2EaqRzpJ5XGdmaN2XK3udA" -ACCESS_TOKEN_KEY = None -ACCESS_TOKEN_SECRET = None -#ACCESS_TOKEN_KEY= "47312923-LiNTtz0I18YXMVIrFeTuhmH7bOvYsK6p3Ln2Dc" -#ACCESS_TOKEN_SECRET = "r3LoXVcjImNAElUpWqTu2SG2xCdWFHkva7xeQoncA" - -def adapt_date(date_str): - ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable - return datetime.datetime(*ts[0:7]) - -def adapt_json(obj): - if obj is None: - return None - else: - return anyjson.serialize(obj) - -class TweetMeta(type(Base)): - - def __init__(cls, name, bases, ns): #@NoSelf - def init(self, **kwargs): - for key, value in kwargs.items(): - if hasattr(self, key): - setattr(self, key, value) - super(cls, self).__init__() - setattr(cls, '__init__', init) - super(TweetMeta, cls).__init__(name, bases, ns) - - -class ProcessEvent(Base): - __metaclass__ = TweetMeta - __tablename__ = "tweet_process_event" - id = Column(Integer, primary_key=True, autoincrement=True) - ts = Column(DateTime, default=datetime.datetime.utcnow, index=True) - type = Column(Enum("start","pid","shutdown","error", "start_worker", "stop_worker", "model_version", "application_name", "application_version", name="process_event_type_enum"), nullable=False) - args = Column(String) - -class EntityType(Base): - __metaclass__ = TweetMeta - __tablename__ = "tweet_entity_type" - id = Column(Integer, primary_key=True, autoincrement=True) - label = Column(String) - -class Entity(Base): - __metaclass__ = TweetMeta - __tablename__ = "tweet_entity" - id = Column(Integer, primary_key=True) - tweet_id = Column(BigInteger, ForeignKey('tweet_tweet.id')) - type = Column(String) - entity_type_id = Column(Integer, ForeignKey('tweet_entity_type.id'), nullable=False) - entity_type = relationship("EntityType", backref="entities") - indice_start = Column(Integer) - indice_end = Column(Integer) - source = Column(String) - __mapper_args__ = {'polymorphic_on': type, 'polymorphic_identity': 'entity_entity', 'with_polymorphic':'*'} - - -class TweetSource(Base): - __metaclass__ = TweetMeta - __tablename__ = 'tweet_tweet_source' - id = Column(Integer, primary_key=True, autoincrement=True) - original_json = Column(String) - received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True) - - -class TweetLog(Base): - - TWEET_STATUS = { - 'OK' : 1, - 'ERROR' : 2, - 'NOT_TWEET': 3, - } - __metaclass__ = TweetMeta - - __tablename__ = 'tweet_tweet_log' - id = Column(Integer, primary_key=True, autoincrement=True) - ts = Column(DateTime, default=datetime.datetime.utcnow, index=True) - tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id')) - tweet_source = relationship("TweetSource", backref="logs") - status = Column(Integer) - error = Column(String) - error_stack = Column(String) - - -class Tweet(Base): - __metaclass__ = TweetMeta - __tablename__ = 'tweet_tweet' - - id = Column(BigInteger, primary_key=True, autoincrement=False) - id_str = Column(String) - contributors = Column(String) - coordinates = Column(String) - created_at = Column(DateTime, index=True) - favorited = Column(Boolean) - geo = Column(String) - in_reply_to_screen_name = Column(String) - in_reply_to_status_id = Column(BigInteger) - in_reply_to_status_id_str = Column(String) - in_reply_to_user_id = Column(BigInteger) - in_reply_to_user_id_str = Column(String) - place = Column(String) - retweet_count = Column(String) - retweeted = Column(Boolean) - source = Column(String) - text = Column(String) - truncated = Column(Boolean) - user_id = Column(Integer, ForeignKey('tweet_user.id')) - user = relationship("User", backref="tweets") - tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id')) - tweet_source = relationship("TweetSource", backref="tweet") - entity_list = relationship(Entity, backref='tweet') - received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True) - - -class UserMessage(Base): - __metaclass__ = TweetMeta - __tablename__ = "tweet_user_message" - - id = Column(Integer, primary_key=True) - user_id = Column(Integer, ForeignKey('tweet_user.id')) - user = relationship("User", backref="messages") - created_at = Column(DateTime, default=datetime.datetime.utcnow) - message_id = Column(Integer, ForeignKey('tweet_message.id')) - -class Message(Base): - __metaclass__ = TweetMeta - __tablename__ = "tweet_message" - - id = Column(Integer, primary_key=True) - created_at = Column(DateTime, default=datetime.datetime.utcnow) - text = Column(String) - users = relationship(UserMessage, backref='message') - - -class User(Base): - __metaclass__ = TweetMeta - __tablename__ = "tweet_user" - - id = Column(BigInteger, primary_key=True, autoincrement=False) - id_str = Column(String) - contributors_enabled = Column(Boolean) - created_at = Column(DateTime, index=True) - description = Column(String) - favourites_count = Column(Integer) - follow_request_sent = Column(Boolean) - followers_count = Column(Integer) - following = Column(String) - friends_count = Column(Integer) - geo_enabled = Column(Boolean) - is_translator = Column(Boolean) - lang = Column(String) - listed_count = Column(Integer) - location = Column(String) - name = Column(String) - notifications = Column(String) - profile_background_color = Column(String) - profile_background_image_url = Column(String) - profile_background_tile = Column(Boolean) - profile_image_url = Column(String) - profile_image_url_https = Column(String) - profile_link_color = Column(String) - profile_sidebar_border_color = Column(String) - profile_sidebar_fill_color = Column(String) - profile_text_color = Column(String) - default_profile_image = Column(String) - profile_use_background_image = Column(Boolean) - protected = Column(Boolean) - screen_name = Column(String, index=True) - show_all_inline_media = Column(Boolean) - statuses_count = Column(Integer) - time_zone = Column(String) - url = Column(String) - utc_offset = Column(Integer) - verified = Column(Boolean) - - -class Hashtag(Base): - __metaclass__ = TweetMeta - __tablename__ = "tweet_hashtag" - id = Column(Integer, primary_key=True) - text = Column(String, unique=True, index=True) - - -class Url(Base): - __metaclass__ = TweetMeta - __tablename__ = "tweet_url" - id = Column(Integer, primary_key=True) - url = Column(String, unique=True) - expanded_url = Column(String) - - -class MediaType(Base): - __metaclass__ = TweetMeta - __tablename__ = "tweet_media_type" - id = Column(Integer, primary_key=True, autoincrement=True) - label = Column(String, unique=True, index=True) - - - -class Media(Base): - __metaclass__ = TweetMeta - __tablename__ = "tweet_media" - id = Column(BigInteger, primary_key=True, autoincrement=False) - id_str = Column(String, unique=True) - media_url = Column(String, unique=True) - media_url_https = Column(String, unique=True) - url = Column(String) - display_url = Column(String) - expanded_url = Column(String) - sizes = Column(String) - type_id = Column(Integer, ForeignKey("tweet_media_type.id")) - type = relationship(MediaType, primaryjoin=type_id == MediaType.id) - - - -class EntityHashtag(Entity): - __tablename__ = "tweet_entity_hashtag" - __mapper_args__ = {'polymorphic_identity': 'entity_hashtag'} - id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True) - hashtag_id = Column(Integer, ForeignKey("tweet_hashtag.id")) - hashtag = relationship(Hashtag, primaryjoin=hashtag_id == Hashtag.id) - - -class EntityUrl(Entity): - __tablename__ = "tweet_entity_url" - __mapper_args__ = {'polymorphic_identity': 'entity_url'} - id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True) - url_id = Column(Integer, ForeignKey("tweet_url.id")) - url = relationship(Url, primaryjoin=url_id == Url.id) - -class EntityUser(Entity): - __tablename__ = "tweet_entity_user" - __mapper_args__ = {'polymorphic_identity': 'entity_user'} - id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True) - user_id = Column(BigInteger, ForeignKey('tweet_user.id')) - user = relationship(User, primaryjoin=(user_id == User.id)) - - -class EntityMedia(Entity): - __tablename__ = "tweet_entity_media" - __mapper_args__ = {'polymorphic_identity': 'entity_media'} - id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True) - media_id = Column(BigInteger, ForeignKey('tweet_media.id')) - media = relationship(Media, primaryjoin=(media_id == Media.id)) - -def add_model_version(session, must_commit=True): - pe = ProcessEvent(args=iri_tweet.get_version(), type="model_version") - session.add(pe) - if must_commit: - session.commit() - -def setup_database(*args, **kwargs): - - session_argname = [ 'autoflush','binds', "class_", "_enable_transaction_accounting","expire_on_commit", "extension", "query_cls", "twophase", "weak_identity_map", "autocommit"] - - kwargs_ce = dict((k, v) for k,v in kwargs.items() if (k not in session_argname and k != "create_all")) - - engine = create_engine(*args, **kwargs_ce) - metadata = Base.metadata - - kwargs_sm = {'bind': engine} - - kwargs_sm.update([(argname, kwargs[argname]) for argname in session_argname if argname in kwargs]) - - Session = sessionmaker(**kwargs_sm) - #set model version - - if kwargs.get('create_all', True): - metadata.create_all(engine) - session = Session() - try: - add_model_version(session) - finally: - session.close() - - return (engine, metadata, Session) - diff -r 7fb5a7b0d35c -r 99215db3da25 script/lib/iri_tweet/setup.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/iri_tweet/setup.py Tue Feb 21 10:47:06 2012 +0100 @@ -0,0 +1,80 @@ +#@PydevCodeAnalysisIgnore +import sys +import os + +extra = {} +if sys.version_info >= (3, 0): + extra.update(use_2to3=True) + + +try: + from setuptools import setup, find_packages +except ImportError: + from distutils.core import setup, find_packages + + +# -*- Distribution Meta -*- +import re +re_meta = re.compile(r'__(\w+?)__\s*=\s*(.*)') +re_vers = re.compile(r'VERSION\s*=\s*\((.*?)\)') +re_doc = re.compile(r'^"""(.+?)"""', re.M|re.S) +rq = lambda s: s.strip("\"'") + + +def add_default(m): + attr_name, attr_value = m.groups() + return ((attr_name, rq(attr_value)), ) + + +def add_version(m): + v = list(map(rq, m.groups()[0].split(", "))) + return (("VERSION", ".".join(v[0:3]) + "".join(v[3:])), ) + + +def add_doc(m): + return (("doc", m.groups()[0].replace("\n", " ")), ) + +pats = {re_meta: add_default, + re_vers: add_version} +here = os.path.abspath(os.path.dirname(__file__)) +meta_fh = open(os.path.join(here, "iri_tweet/__init__.py")) +try: + meta = {} + acc = [] + for line in meta_fh: + if line.strip() == '# -eof meta-': + break + acc.append(line) + for pattern, handler in pats.items(): + m = pattern.match(line.strip()) + if m: + meta.update(handler(m)) + m = re_doc.match("".join(acc).strip()) + if m: + meta.update(add_doc(m)) +finally: + meta_fh.close() + + +setup(name='iri_tweet', + version=meta["VERSION"], + description=meta["doc"], + long_description=open("README").read(), + classifiers=[ + 'License :: OSI Approved :: BSD License', + 'Intended Audience :: Developers', + 'Programming Language :: Python :: 2.6', + 'Programming Language :: Python :: 2.7', + ], + keywords='twitter', + author=meta["author"], + author_email=meta["contact"], + url=meta["homepage"], + license='BSD, CECCIL-C', + packages=find_packages(exclude=['ez_setup', 'examples', 'tests']), + include_package_data=True, + zip_safe=False, + platforms=["any"], + install_requires=[], + **extra +) diff -r 7fb5a7b0d35c -r 99215db3da25 script/lib/iri_tweet/tests.py --- a/script/lib/iri_tweet/tests.py Mon Feb 20 18:52:19 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,197 +0,0 @@ -from sqlalchemy import Column, Integer, String, ForeignKey, create_engine -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import relationship, backref -import unittest #@UnresolvedImport -from sqlalchemy.orm import sessionmaker -from iri_tweet.utils import ObjectsBuffer, TwitterProcessor -from iri_tweet import models -import tempfile #@UnresolvedImport -import os - -Base = declarative_base() - -class User(Base): - __tablename__ = 'users' - - id = Column(Integer, primary_key=True) - name = Column(String) - fullname = Column(String) - password = Column(String) - - def __init__(self, name, fullname, password): - self.name = name - self.fullname = fullname - self.password = password - - def __repr__(self): - return "" % (self.name, self.fullname, self.password) - - -class Address(Base): - __tablename__ = 'addresses' - id = Column(Integer, primary_key=True) - email_address = Column(String, nullable=False) - user_id = Column(Integer, ForeignKey('users.id')) - - user = relationship("User", backref=backref('addresses', order_by=id)) - - def __init__(self, user_id, email_address): - self.email_address = email_address - self.user_id = user_id - - def __repr__(self): - return "" % self.email_address - - - -class TestObjectBuffer(unittest.TestCase): - - def setUp(self): - self.engine = create_engine('sqlite:///:memory:', echo=False) - Base.metadata.create_all(self.engine) - sessionMaker = sessionmaker(bind=self.engine) - self.session = sessionMaker() - - def tearDown(self): - self.session.close() - self.engine.dispose() - - - def testCreateUser(self): - ed_user = User('ed', 'Ed Jones', 'edspassword') - self.session.add(ed_user) - self.assertTrue(ed_user.id is None) - self.session.commit() - self.assertTrue(ed_user.id is not None) - - - def testSimpleBuffer(self): - obj_buffer = ObjectsBuffer() - obj_proxy = obj_buffer.add_object(User, ['ed1', 'Ed1 Jones', 'edspassword'], None, False) - self.assertTrue(obj_proxy.id() is None) - obj_buffer.persists(self.session) - self.assertTrue(obj_proxy.id() is None) - self.session.commit() - self.assertTrue(obj_proxy.id() is not None) - - - def testSimpleBufferKwargs(self): - obj_buffer = ObjectsBuffer() - obj_proxy = obj_buffer.add_object(User, None, {'name':'ed1b', 'fullname':'Ed1b Jones', 'password':'edspassword'}, False) - self.assertTrue(obj_proxy.id() is None) - obj_buffer.persists(self.session) - self.assertTrue(obj_proxy.id() is None) - self.session.commit() - self.assertTrue(obj_proxy.id() is not None) - - - def testSimpleBufferFlush(self): - obj_buffer = ObjectsBuffer() - obj_proxy = obj_buffer.add_object(User, ['ed2', 'Ed2 Jones', 'edspassword'], None, True) - self.assertTrue(obj_proxy.id() is None) - obj_buffer.persists(self.session) - self.assertTrue(obj_proxy.id() is not None) - self.session.commit() - self.assertTrue(obj_proxy.id() is not None) - - def testRelationBuffer(self): - obj_buffer = ObjectsBuffer() - user1_proxy = obj_buffer.add_object(User, ['ed3', 'Ed3 Jones', 'edspassword'], None, True) - obj_buffer.add_object(Address, [user1_proxy.id,'ed3@mail.com'], None, False) - obj_buffer.add_object(Address, [user1_proxy.id,'ed3@other.com'], None, False) - user2_proxy = obj_buffer.add_object(User, ['ed4', 'Ed3 Jones', 'edspassword'], None, True) - obj_buffer.add_object(Address, [user2_proxy.id,'ed4@mail.com'], None, False) - obj_buffer.persists(self.session) - self.session.commit() - ed_user = self.session.query(User).filter_by(name='ed3').first() - self.assertEquals(2, len(ed_user.addresses)) - ed_user = self.session.query(User).filter_by(name='ed4').first() - self.assertEquals(1, len(ed_user.addresses)) - - - def testGet(self): - obj_buffer = ObjectsBuffer() - user1_proxy = obj_buffer.add_object(User, None, {'name':'ed2', 'fullname':'Ed2 Jones', 'password':'edspassword'}, True) - adress_proxy = obj_buffer.add_object(Address, None, {'user_id':user1_proxy.id,'email_address':'ed2@other.com'}, False) - user2_proxy = obj_buffer.add_object(User, None, {'name':'ed3', 'fullname':'Ed3 Jones', 'password':'edspassword'}, True) - obj_buffer.add_object(Address, None, {'user_id':user2_proxy.id,'email_address':'ed3@other.com'}, False) - self.assertEquals(user1_proxy, obj_buffer.get(User, name='ed2')) - self.assertEquals(adress_proxy, obj_buffer.get(Address, email_address='ed2@other.com')) - self.assertEquals(user2_proxy, obj_buffer.get(User, name='ed3')) - self.assertTrue(obj_buffer.get(User, name='ed3', fullname='Ed2 Jones') is None) - -original_json = u'{"in_reply_to_user_id_str":null,"text":"RT @BieberEagle: \\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweeted_status":{"in_reply_to_user_id_str":null,"text":"\\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweet_count":"100+","source":"web","entities":{"user_mentions":[],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96638597737889792","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/298443445\/355584171.jpg","listed_count":5040,"friends_count":8477,"profile_link_color":"ff0000","profile_sidebar_border_color":"000000","url":"http:\/\/twitpic.com\/photos\/BieberEagle","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","profile_image_url":"http:\/\/a2.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","description":"1 name, 1 inspiration, 1 hero, 1 smile, 1 singer, 1 boy who changed my life. B.\u0130.E.B.E.R-Believe In Everything Because Everything\'s Reachable. #NEVERSAYNEVER","default_profile":false,"notifications":null,"time_zone":"Paris","followers_count":14506,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"BieberEagle","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ffffff","location":"\u2665 Albania \u2665 ","id_str":"229067923","profile_background_image_url":"http:\/\/a2.twimg.com\/profile_background_images\/298443445\/355584171.jpg","favourites_count":89,"protected":false,"follow_request_sent":null,"following":null,"name":"truebelieber","statuses_count":24279,"verified":false,"created_at":"Tue Dec 21 12:35:18 +0000 2010","profile_text_color":"000000","id":229067923,"contributors_enabled":false,"utc_offset":3600,"profile_sidebar_fill_color":""},"id":96638597737889792,"created_at":"Thu Jul 28 17:50:11 +0000 2011","geo":null,"in_reply_to_screen_name":null},"retweet_count":"100+","source":"web","entities":{"user_mentions":[{"indices":[3,15],"screen_name":"BieberEagle","id_str":"229067923","name":"truebelieber","id":229067923}],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96965037637382145","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","listed_count":3,"friends_count":1150,"profile_link_color":"00cccc","profile_sidebar_border_color":"c8ff00","url":"http:\/\/www.facebook.com\/blovedbecca180","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","profile_image_url":"http:\/\/a3.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","description":"if ya wanna know something about me, then get to know me. \\n\\r\\n\\ri promise, you wont regret it. (:\\r\\ni love justin bieber with an extreme burning passion!","default_profile":false,"notifications":null,"time_zone":"Central Time (US & Canada)","followers_count":361,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"beccaxannxx","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ff0066","location":"","id_str":"65624607","profile_background_image_url":"http:\/\/a3.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","favourites_count":266,"protected":false,"follow_request_sent":null,"following":null,"name":"beccaxannxx","statuses_count":2512,"verified":false,"created_at":"Fri Aug 14 12:36:35 +0000 2009","profile_text_color":"6a39d4","id":65624607,"contributors_enabled":false,"utc_offset":-21600,"profile_sidebar_fill_color":"ff00bb"},"id":96965037637382145,"created_at":"Fri Jul 29 15:27:21 +0000 2011","geo":null,"in_reply_to_screen_name":null}' -original_json_media = u'{"user": {"follow_request_sent": null, "profile_use_background_image": true, "id": 34311537, "verified": false, "profile_image_url_https": "https://si0.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "profile_sidebar_fill_color": "DAECF4", "is_translator": false, "geo_enabled": false, "profile_text_color": "663B12", "followers_count": 29, "profile_sidebar_border_color": "C6E2EE", "id_str": "34311537", "default_profile_image": false, "location": "", "utc_offset": -25200, "statuses_count": 813, "description": "i like joe jonas, justin bieber, ashley tisdale, selena gomez, megen fox, kim kardashian and demi lovoto and many more.", "friends_count": 101, "profile_link_color": "1F98C7", "profile_image_url": "http://a1.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "notifications": null, "show_all_inline_media": false, "profile_background_image_url_https": "https://si0.twimg.com/profile_background_images/298244445/tour2011.jpg", "profile_background_color": "C6E2EE", "profile_background_image_url": "http://a1.twimg.com/profile_background_images/298244445/tour2011.jpg", "name": "mikayla", "lang": "en", "profile_background_tile": true, "favourites_count": 231, "screen_name": "bieberfever17ya", "url": null, "created_at": "Wed Apr 22 16:04:28 +0000 2009", "contributors_enabled": false, "time_zone": "Mountain Time (US & Canada)", "protected": false, "default_profile": false, "following": null, "listed_count": 1}, "favorited": false, "entities": {"user_mentions": [], "media": [{"media_url_https": "https://p.twimg.com/AWea5Z-CQAAvfvK.jpg", "expanded_url": "http://twitter.com/bieberfever17ya/status/101219827649232896/photo/1", "sizes": {"small": {"h": 240, "w": 201, "resize": "fit"}, "large": {"h": 240, "w": 201, "resize": "fit"}, "medium": {"h": 240, "w": 201, "resize": "fit"}, "thumb": {"h": 150, "w": 150, "resize": "crop"}}, "url": "http://t.co/N7yZ8hS", "display_url": "pic.twitter.com/N7yZ8hS", "id_str": "101219827653427200", "indices": [31, 50], "type": "photo", "id": 101219827653427200, "media_url": "http://p.twimg.com/AWea5Z-CQAAvfvK.jpg"}], "hashtags": [], "urls": []}, "contributors": null, "truncated": false, "text": "i love you justin bieber <3 http://t.co/N7yZ8hS", "created_at": "Wed Aug 10 09:14:22 +0000 2011", "retweeted": false, "in_reply_to_status_id": null, "coordinates": null, "id": 101219827649232896, "source": "web", "in_reply_to_status_id_str": null, "place": null, "in_reply_to_user_id": null, "in_reply_to_screen_name": null, "retweet_count": 0, "geo": null, "in_reply_to_user_id_str": null, "possibly_sensitive": false, "id_str": "101219827649232896"}' -original_json_media_others = u'{"user": {"utc_offset": -25200, "statuses_count": 813, "default_profile_image": false, "friends_count": 101, "profile_background_image_url_https": "https://si0.twimg.com/profile_background_images/298244445/tour2011.jpg", "profile_use_background_image": true, "profile_sidebar_fill_color": "DAECF4", "profile_link_color": "1F98C7", "profile_image_url": "http://a1.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "time_zone": "Mountain Time (US & Canada)", "is_translator": false, "screen_name": "bieberfever17ya", "url": null, "show_all_inline_media": false, "geo_enabled": false, "profile_background_color": "C6E2EE", "id": 34311537, "profile_background_image_url": "http://a1.twimg.com/profile_background_images/298244445/tour2011.jpg", "description": "i like joe jonas, justin bieber, ashley tisdale, selena gomez, megen fox, kim kardashian and demi lovoto and many more.", "lang": "en", "profile_background_tile": true, "favourites_count": 231, "name": "mikayla", "notifications": null, "follow_request_sent": null, "created_at": "Wed Apr 22 16:04:28 +0000 2009", "verified": false, "contributors_enabled": false, "location": "", "profile_text_color": "663B12", "followers_count": 29, "profile_sidebar_border_color": "C6E2EE", "id_str": "34311537", "default_profile": false, "following": null, "protected": false, "profile_image_url_https": "https://si0.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "listed_count": 1}, "favorited": false, "contributors": null, "source": "web", "text": "i love you justin bieber <3 http://t.co/N7yZ8hS", "created_at": "Wed Aug 10 09:14:22 +0000 2011", "truncated": false, "retweeted": false, "in_reply_to_status_id_str": null, "coordinates": null, "in_reply_to_user_id_str": null, "entities": {"user_mentions": [], "media": [], "hashtags": [], "urls": [], "others": [{"url": "http://t.co/N7yZ8hS", "text": "comments", "indices": [31, 50]}]}, "in_reply_to_status_id": null, "in_reply_to_screen_name": null, "id_str": "101219827649232896", "place": null, "retweet_count": 0, "geo": null, "id": 101219827649232896, "possibly_sensitive": false, "in_reply_to_user_id": null}' - -class TestTwitterProcessor(unittest.TestCase): - - def setUp(self): - self.engine, self.metadata, sessionMaker = models.setup_database('sqlite:///:memory:', echo=True) - self.session = sessionMaker() - file, self.tmpfilepath = tempfile.mkstemp() - os.close(file) - - - def testTwitterProcessor(self): - tp = TwitterProcessor(None, original_json, None, self.session, self.tmpfilepath) - tp.process() - self.session.commit() - - self.assertEquals(1, self.session.query(models.TweetSource).count()) - self.assertEquals(1, self.session.query(models.Tweet).count()) - self.assertEquals(2, self.session.query(models.User).count()) - tweet = self.session.query(models.Tweet).first() - self.assertFalse(tweet.user is None) - self.assertEqual(u"beccaxannxx",tweet.user.name) - self.assertEqual(65624607,tweet.user.id) - self.assertEqual(1,len(tweet.entity_list)) - entity = tweet.entity_list[0] - self.assertEqual(u"BieberEagle", entity.user.screen_name) - self.assertTrue(entity.user.created_at is None) - self.assertEqual("entity_user", entity.type) - self.assertEqual("user_mentions", entity.entity_type.label) - - - def testTwitterProcessorMedia(self): - tp = TwitterProcessor(None, original_json_media, None, self.session, self.tmpfilepath) - tp.process() - self.session.commit() - - self.assertEquals(1, self.session.query(models.TweetSource).count()) - self.assertEquals(1, self.session.query(models.Tweet).count()) - self.assertEquals(1, self.session.query(models.User).count()) - tweet = self.session.query(models.Tweet).first() - self.assertFalse(tweet.user is None) - self.assertEqual(u"mikayla",tweet.user.name) - self.assertEqual(34311537,tweet.user.id) - self.assertEqual(1,len(tweet.entity_list)) - entity = tweet.entity_list[0] - self.assertEqual(101219827653427200, entity.media.id) - self.assertEqual("photo", entity.media.type.label) - self.assertEqual("entity_media", entity.type) - self.assertEqual("media", entity.entity_type.label) - - - def testTwitterProcessorMediaOthers(self): - tp = TwitterProcessor(None, original_json_media_others, None, self.session, self.tmpfilepath) - tp.process() - self.session.commit() - - self.assertEquals(1, self.session.query(models.TweetSource).count()) - self.assertEquals(1, self.session.query(models.Tweet).count()) - tweet = self.session.query(models.Tweet).first() - self.assertEqual(1,len(tweet.entity_list)) - entity = tweet.entity_list[0] - self.assertEqual("entity_entity", entity.type) - self.assertEqual("others", entity.entity_type.label) - - - - def tearDown(self): - self.session.close() - self.engine.dispose() - os.remove(self.tmpfilepath) - -if __name__ == '__main__': - unittest.main() \ No newline at end of file diff -r 7fb5a7b0d35c -r 99215db3da25 script/lib/iri_tweet/tweet_twitter_user.py --- a/script/lib/iri_tweet/tweet_twitter_user.py Mon Feb 20 18:52:19 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,126 +0,0 @@ -from iri_tweet.models import setup_database, Message, UserMessage, User -from iri_tweet.utils import (get_oauth_token, get_user_query, set_logging_options, - set_logging, parse_date, get_logger) -from optparse import OptionParser #@UnresolvedImport -from sqlalchemy import BigInteger -from sqlalchemy.schema import Table, Column -from sqlalchemy.sql import and_ -import datetime -import re -import sys -import twitter - -APPLICATION_NAME = "Tweet recorder user" -CONSUMER_KEY = "Vdr5ZcsjI1G3esTPI8yDg" -CONSUMER_SECRET = "LMhNrY99R6a7E0YbZZkRFpUZpX5EfB1qATbDk1sIVLs" - - -def get_options(): - parser = OptionParser() - parser.add_option("-d", "--database", dest="database", - help="Input database", metavar="DATABASE") - parser.add_option("-s", "--start-date", dest="start_date", - help="start date", metavar="START_DATE", default=None) - parser.add_option("-e", "--end-date", dest="end_date", - help="end date", metavar="END_DATE") - parser.add_option("-H", "--hashtag", dest="hashtag", - help="Hashtag", metavar="HASHTAG", default=[], action="append") - parser.add_option("-x", "--exclude", dest="exclude", - help="file containing the id to exclude", metavar="EXCLUDE") - parser.add_option("-D", "--duration", dest="duration", type="int", - help="Duration", metavar="DURATION", default=None) - parser.add_option("-m", "--message", dest="message", - help="tweet", metavar="MESSAGE", default="") - parser.add_option("-u", "--user", dest="user", - help="user", metavar="USER") - parser.add_option("-w", "--password", dest="password", - help="password", metavar="PASSWORD") - parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", - help="Token file name") - parser.add_option("-S", dest="simulate", metavar="SIMULATE", default=False, action="store_true", help="Simulate call to twitter. Do not change the database") - parser.add_option("-f", dest="force", metavar="FORCE", default=False, action="store_true", help="force sending message to all user even if it has already been sent") - - - set_logging_options(parser) - - return parser.parse_args() - - -if __name__ == "__main__": - - (options, args) = get_options() - - set_logging(options) - - get_logger().debug("OPTIONS : " + repr(options)) #@UndefinedVariable - - if not options.message or len(options.message) == 0: - get_logger().warning("No message exiting") - sys.exit() - - conn_str = options.database.strip() - if not re.match("^\w+://.+", conn_str): - conn_str = 'sqlite:///' + conn_str - - engine, metadata, Session = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False) - - conn = None - try : - conn = engine.connect() - session = None - try: - session = Session(bind=conn, autoflush=True, autocommit=True) - tweet_exclude_table = Table("tweet_exclude", metadata, Column('id', BigInteger, primary_key=True), prefixes=['TEMPORARY']) - metadata.create_all(bind=conn,tables=[tweet_exclude_table]) - - start_date_str = options.start_date - end_date_str = options.end_date - duration = options.duration - hashtags = options.hashtag - - start_date = None - if start_date_str: - start_date = parse_date(start_date_str) - - end_date = None - if end_date_str: - end_date = parse_date(end_date_str) - elif start_date and duration: - end_date = start_date + datetime.timedelta(seconds=duration) - - base_message = options.message.decode(sys.getfilesystemencoding()) - #get or create message - message_obj = session.query(Message).filter(Message.text == base_message).first() - if not message_obj : - message_obj = Message(text=base_message) - session.add(message_obj) - session.flush() - - query = get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table) - - if not options.force: - query = query.outerjoin(UserMessage, and_(User.id == UserMessage.user_id, UserMessage.message_id == message_obj.id)).filter(UserMessage.message_id == None) - - query_res = query.all() - - acess_token_key, access_token_secret = get_oauth_token(options.token_filename, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET) - t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET)) - - for user in query_res: - screen_name = user.screen_name - - message = u"@%s: %s" % (screen_name, base_message) - get_logger().debug("new status : " + message) #@UndefinedVariable - if not options.simulate: - t.statuses.update(status=message) - user_message = UserMessage(user_id=user.id, message_id=message_obj.id) - session.add(user_message) - session.flush() - finally: - # if message created and simulate, do not - if session: - session.close() - finally: - if conn: - conn.close() - diff -r 7fb5a7b0d35c -r 99215db3da25 script/lib/iri_tweet/utils.py --- a/script/lib/iri_tweet/utils.py Mon Feb 20 18:52:19 2012 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,616 +0,0 @@ -from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, - EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, - ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType, - Media, EntityMedia, Entity, EntityType) -from sqlalchemy.sql import select, or_ #@UnresolvedImport -import Queue #@UnresolvedImport -import anyjson #@UnresolvedImport -import datetime -import email.utils -import logging -import os.path -import sys -import math -import twitter.oauth #@UnresolvedImport -import twitter.oauth_dance #@UnresolvedImport -import twitter_text #@UnresolvedImport - - -CACHE_ACCESS_TOKEN = {} - -def get_oauth_token(token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET): - - global CACHE_ACCESS_TOKEN - - if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET: - return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET - - res = CACHE_ACCESS_TOKEN.get(application_name, None) - - if res is None and token_file_path and os.path.exists(token_file_path): - get_logger().debug("get_oauth_token : reading token from file %s" % token_file_path) #@UndefinedVariable - res = twitter.oauth.read_token_file(token_file_path) - - if res is not None and check_access_token: - get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable - t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], CONSUMER_KEY, CONSUMER_SECRET)) - status = None - try: - status = t.account.rate_limit_status() - except Exception as e: - get_logger().debug("get_oauth_token : error getting rate limit status %s" % repr(e)) - status = None - get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable - if status is None or status['remaining_hits'] == 0: - get_logger().debug("get_oauth_token : Problem with status %s" % repr(status)) - res = None - - if res is None: - get_logger().debug("get_oauth_token : doing the oauth dance") - res = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path) - - CACHE_ACCESS_TOKEN[application_name] = res - - return res - -def parse_date(date_str): - ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable - return datetime.datetime(*ts[0:7]) - -def clean_keys(dict_val): - return dict([(str(key),value) for key,value in dict_val.items()]) - -fields_adapter = { - 'stream': { - "tweet": { - "created_at" : adapt_date, - "coordinates" : adapt_json, - "place" : adapt_json, - "geo" : adapt_json, -# "original_json" : adapt_json, - }, - "user": { - "created_at" : adapt_date, - }, - - }, - - 'entities' : { - "medias": { - "sizes" : adapt_json, - }, - }, - 'rest': { - "tweet" : { - "place" : adapt_json, - "geo" : adapt_json, - "created_at" : adapt_date, -# "original_json" : adapt_json, - }, - }, -} - -# -# adapt fields, return a copy of the field_dict with adapted fields -# -def adapt_fields(fields_dict, adapter_mapping): - def adapt_one_field(field, value): - if field in adapter_mapping and adapter_mapping[field] is not None: - return adapter_mapping[field](value) - else: - return value - return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) - - -class ObjectBufferProxy(object): - def __init__(self, klass, args, kwargs, must_flush, instance=None): - self.klass= klass - self.args = args - self.kwargs = kwargs - self.must_flush = must_flush - self.instance = instance - - def persists(self, session): - new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else [] - new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {} - - if self.instance is None: - self.instance = self.klass(*new_args, **new_kwargs) - else: - self.instance = self.klass(*new_args, **new_kwargs) - self.instance = session.merge(self.instance) - - session.add(self.instance) - if self.must_flush: - session.flush() - - def __getattr__(self, name): - return lambda : getattr(self.instance, name) if self.instance else None - - - - -class ObjectsBuffer(object): - - def __init__(self): - self.__bufferlist = [] - self.__bufferdict = {} - - def __add_proxy_object(self, proxy): - proxy_list = self.__bufferdict.get(proxy.klass, None) - if proxy_list is None: - proxy_list = [] - self.__bufferdict[proxy.klass] = proxy_list - proxy_list.append(proxy) - self.__bufferlist.append(proxy) - - def persists(self, session): - for object_proxy in self.__bufferlist: - object_proxy.persists(session) - - def add_object(self, klass, args, kwargs, must_flush, instance=None): - new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush, instance) - self.__add_proxy_object(new_proxy) - return new_proxy - - def get(self, klass, **kwargs): - if klass in self.__bufferdict: - for proxy in self.__bufferdict[klass]: - if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass: - continue - found = True - for k,v in kwargs.items(): - if (k not in proxy.kwargs) or v != proxy.kwargs[k]: - found = False - break - if found: - return proxy - return None - -class TwitterProcessorException(Exception): - pass - -class TwitterProcessor(object): - - def __init__(self, json_dict, json_txt, source_id, session, access_token=None, token_filename=None, user_query_twitter=False): - - if json_dict is None and json_txt is None: - raise TwitterProcessorException("No json") - - if json_dict is None: - self.json_dict = anyjson.deserialize(json_txt) - else: - self.json_dict = json_dict - - if not json_txt: - self.json_txt = anyjson.serialize(json_dict) - else: - self.json_txt = json_txt - - if "id" not in self.json_dict: - raise TwitterProcessorException("No id in json") - - self.source_id = source_id - self.session = session - self.token_filename = token_filename - self.access_token = access_token - self.obj_buffer = ObjectsBuffer() - self.user_query_twitter = user_query_twitter - - - - def __get_user(self, user_dict, do_merge): - get_logger().debug("Get user : " + repr(user_dict)) #@UndefinedVariable - - user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) - - user_id = user_dict.get("id",None) - user_name = user_dict.get("screen_name", user_dict.get("name", None)) - - if user_id is None and user_name is None: - return None - - user = None - if user_id: - user = self.obj_buffer.get(User, id=user_id) - else: - user = self.obj_buffer.get(User, screen_name=user_name) - - #to do update user id needed - if user is not None: - user_created_at = None - if user.args is not None: - user_created_at = user.args.get('created_at', None) - if user_created_at is None and user_dict.get('created_at', None) is not None and do_merge: - if user.args is None: - user.args = user_dict - else: - user.args.update(user_dict) - return user - - #todo : add methpds to objectbuffer to get buffer user - user_obj = None - if user_id: - user_obj = self.session.query(User).filter(User.id == user_id).first() - else: - user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first() - - #todo update user if needed - if user_obj is not None: - if user_obj.created_at is not None or user_dict.get('created_at', None) is None or not do_merge : - user = ObjectBufferProxy(User, None, None, False, user_obj) - else: - user = self.obj_buffer.add_object(User, None, user_dict, True, user_obj) - return user - - user_created_at = user_dict.get("created_at", None) - - if user_created_at is None and self.user_query_twitter: - - if self.access_token is not None: - acess_token_key, access_token_secret = self.access_token - else: - acess_token_key, access_token_secret = get_oauth_token(self.token_filename) - t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET)) - try: - if user_id: - user_dict = t.users.show(user_id=user_id) - else: - user_dict = t.users.show(screen_name=user_name) - except Exception as e: - get_logger().info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable - get_logger().info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable - return None - - if "id" not in user_dict: - return None - - #TODO filter get, wrap in proxy - user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first() - - if user_obj is not None and not do_merge: - return ObjectBufferProxy(User, None, None, False, user_obj) - else: - return self.obj_buffer.add_object(User, None, user_dict, True) - - def __get_or_create_object(self, klass, filter_by_kwargs, filter, creation_kwargs, must_flush, do_merge): - - obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs) - if obj_proxy is None: - query = self.session.query(klass) - if filter is not None: - query = query.filter(filter) - else: - query = query.filter_by(**filter_by_kwargs) - obj_instance = query.first() - if obj_instance is not None: - if not do_merge: - obj_proxy = ObjectBufferProxy(klass, None, None, False, obj_instance) - else: - obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush, obj_instance) - if obj_proxy is None: - obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush) - return obj_proxy - - - def __process_entity(self, ind, ind_type): - get_logger().debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable - - ind = clean_keys(ind) - - entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False) - - entity_dict = { - "indice_start" : ind["indices"][0], - "indice_end" : ind["indices"][1], - "tweet_id" : self.tweet.id, - "entity_type_id" : entity_type.id, - "source" : adapt_json(ind) - } - - def process_medias(): - - media_id = ind.get('id', None) - if media_id is None: - return None, None - - type_str = ind.get("type", "photo") - media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False) - media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"]) - if "type" in media_ind: - del(media_ind["type"]) - media_ind['type_id'] = media_type.id - media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False) - - entity_dict['media_id'] = media.id - return EntityMedia, entity_dict - - def process_hashtags(): - text = ind.get("text", ind.get("hashtag", None)) - if text is None: - return None, None - ind['text'] = text - hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False) - entity_dict['hashtag_id'] = hashtag.id - return EntityHashtag, entity_dict - - def process_user_mentions(): - user_mention = self.__get_user(ind, False) - if user_mention is None: - entity_dict['user_id'] = None - else: - entity_dict['user_id'] = user_mention.id - return EntityUser, entity_dict - - def process_urls(): - url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False) - entity_dict['url_id'] = url.id - return EntityUrl, entity_dict - - #{'': lambda } - entity_klass, entity_dict = { - 'hashtags': process_hashtags, - 'user_mentions' : process_user_mentions, - 'urls' : process_urls, - 'media': process_medias, - }.get(ind_type, lambda: (Entity, entity_dict))() - - get_logger().debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable - if entity_klass: - self.obj_buffer.add_object(entity_klass, None, entity_dict, False) - - - def __process_twitter_stream(self): - - tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() - if tweet_nb > 0: - return - - ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"]) - - # get or create user - user = self.__get_user(self.json_dict["user"], True) - if user is None: - get_logger().warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable - ts_copy["user_id"] = None - else: - ts_copy["user_id"] = user.id - - del(ts_copy['user']) - ts_copy["tweet_source_id"] = self.source_id - - self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True) - - self.__process_entities() - - - def __process_entities(self): - if "entities" in self.json_dict: - for ind_type, entity_list in self.json_dict["entities"].items(): - for ind in entity_list: - self.__process_entity(ind, ind_type) - else: - - text = self.tweet.text - extractor = twitter_text.Extractor(text) - for ind in extractor.extract_hashtags_with_indices(): - self.__process_entity(ind, "hashtags") - - for ind in extractor.extract_urls_with_indices(): - self.__process_entity(ind, "urls") - - for ind in extractor.extract_mentioned_screen_names_with_indices(): - self.__process_entity(ind, "user_mentions") - - def __process_twitter_rest(self): - tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() - if tweet_nb > 0: - return - - - tweet_fields = { - 'created_at': self.json_dict["created_at"], - 'favorited': False, - 'id': self.json_dict["id"], - 'id_str': self.json_dict["id_str"], - #'in_reply_to_screen_name': ts["to_user"], - 'in_reply_to_user_id': self.json_dict["to_user_id"], - 'in_reply_to_user_id_str': self.json_dict["to_user_id_str"], - #'place': ts["place"], - 'source': self.json_dict["source"], - 'text': self.json_dict["text"], - 'truncated': False, - 'tweet_source_id' : self.source_id, - } - - #user - - user_fields = { - 'lang' : self.json_dict.get('iso_language_code',None), - 'profile_image_url' : self.json_dict["profile_image_url"], - 'screen_name' : self.json_dict["from_user"], - 'id' : self.json_dict["from_user_id"], - 'id_str' : self.json_dict["from_user_id_str"], - 'name' : self.json_dict['from_user_name'], - } - - user = self.__get_user(user_fields, do_merge=False) - if user is None: - get_logger().warning("USER not found " + repr(user_fields)) #@UndefinedVariable - tweet_fields["user_id"] = None - else: - tweet_fields["user_id"] = user.id - - tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) - self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True) - - self.__process_entities() - - - - def process(self): - - if self.source_id is None: - tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True) - self.source_id = tweet_source.id - - if "metadata" in self.json_dict: - self.__process_twitter_rest() - else: - self.__process_twitter_stream() - - self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, True) - - self.obj_buffer.persists(self.session) - - -def set_logging(options, plogger=None, queue=None): - - logging_config = { - "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s', - "level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable - } - - if options.logfile == "stdout": - logging_config["stream"] = sys.stdout - elif options.logfile == "stderr": - logging_config["stream"] = sys.stderr - else: - logging_config["filename"] = options.logfile - - logger = plogger - if logger is None: - logger = get_logger() #@UndefinedVariable - - if len(logger.handlers) == 0: - filename = logging_config.get("filename") - if queue is not None: - hdlr = QueueHandler(queue, True) - elif filename: - mode = logging_config.get("filemode", 'a') - hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable - else: - stream = logging_config.get("stream") - hdlr = logging.StreamHandler(stream) #@UndefinedVariable - - fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable - dfs = logging_config.get("datefmt", None) - fmt = logging.Formatter(fs, dfs) #@UndefinedVariable - hdlr.setFormatter(fmt) - logger.addHandler(hdlr) - level = logging_config.get("level") - if level is not None: - logger.setLevel(level) - - options.debug = (options.verbose-options.quiet > 0) - return logger - -def set_logging_options(parser): - parser.add_option("-l", "--log", dest="logfile", - help="log to file", metavar="LOG", default="stderr") - parser.add_option("-v", dest="verbose", action="count", - help="verbose", metavar="VERBOSE", default=0) - parser.add_option("-q", dest="quiet", action="count", - help="quiet", metavar="QUIET", default=0) - -def get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist): - - query = query.join(EntityHashtag).join(Hashtag) - - if tweet_exclude_table is not None: - query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable - - if start_date: - query = query.filter(Tweet.created_at >= start_date) - if end_date: - query = query.filter(Tweet.created_at <= end_date) - - if user_whitelist: - query = query.join(User).filter(User.screen_name.in_(user_whitelist)) - - - if hashtags : - def merge_hash(l,h): - l.extend(h.split(",")) - return l - htags = reduce(merge_hash, hashtags, []) - - query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable - - return query - - - -def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist): - - query = session.query(Tweet) - query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist) - return query.order_by(Tweet.created_at) - - -def get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table): - - query = session.query(User).join(Tweet) - - query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, None) - - return query.distinct() - -logger_name = "iri.tweet" - -def get_logger(): - global logger_name - return logging.getLogger(logger_name) #@UndefinedVariable - - -# Next two import lines for this demo only - -class QueueHandler(logging.Handler): #@UndefinedVariable - """ - This is a logging handler which sends events to a multiprocessing queue. - """ - - def __init__(self, queue, ignore_full): - """ - Initialise an instance, using the passed queue. - """ - logging.Handler.__init__(self) #@UndefinedVariable - self.queue = queue - self.ignore_full = True - - def emit(self, record): - """ - Emit a record. - - Writes the LogRecord to the queue. - """ - try: - ei = record.exc_info - if ei: - dummy = self.format(record) # just to get traceback text into record.exc_text - record.exc_info = None # not needed any more - if not self.ignore_full or not self.queue.full(): - self.queue.put_nowait(record) - except Queue.Full: - if self.ignore_full: - pass - else: - raise - except (KeyboardInterrupt, SystemExit): - raise - except: - self.handleError(record) - -def show_progress(current_line, total_line, label, width): - - percent = (float(current_line) / float(total_line)) * 100.0 - - marks = math.floor(width * (percent / 100.0)) - spaces = math.floor(width - marks) - - loader = u'[' + (u'=' * int(marks)) + (u' ' * int(spaces)) + u']' - - sys.stdout.write(u"%s %d%% %d/%d - %r\r" % (loader, percent, current_line - 1, total_line - 1, label[:50].rjust(50))) #takes the header into account - if percent >= 100: - sys.stdout.write("\n") - sys.stdout.flush() diff -r 7fb5a7b0d35c -r 99215db3da25 script/lib/tweetstream/tweetstream/__init__.py --- a/script/lib/tweetstream/tweetstream/__init__.py Mon Feb 20 18:52:19 2012 +0100 +++ b/script/lib/tweetstream/tweetstream/__init__.py Tue Feb 21 10:47:06 2012 +0100 @@ -1,6 +1,6 @@ """Simple access to Twitter's streaming API""" -VERSION = (1, 1, 1) +VERSION = (1, 1, 1, 'iri') __version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:]) __author__ = "Rune Halvorsen" __contact__ = "runefh@gmail.com" diff -r 7fb5a7b0d35c -r 99215db3da25 script/utils/export_tweet_db.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/utils/export_tweet_db.py Tue Feb 21 10:47:06 2012 +0100 @@ -0,0 +1,47 @@ +from models import setup_database +from optparse import OptionParser #@UnresolvedImport +from sqlalchemy.orm import sessionmaker +from utils import set_logging_options, set_logging, TwitterProcessor, logger +import sqlite3 #@UnresolvedImport + + +# 'entities': "tweet_entity", +# 'user': "tweet_user" + +def get_option(): + + parser = OptionParser() + + parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", + help="Token file name") + + set_logging_options(parser) + + return parser.parse_args() + +if __name__ == "__main__": + + (options, args) = get_option() + + set_logging(options) + + with sqlite3.connect(args[0]) as conn_in: + engine, metadata, Session = setup_database('sqlite:///'+args[1], echo=((options.verbose-options.quiet)>0)) + session = Session() + try: + curs_in = conn_in.cursor() + fields_mapping = {} + for i,res in enumerate(curs_in.execute("select json from tweet_tweet;")): + logger.debug("main loop %d : %s" % (i, res[0])) #@UndefinedVariable + processor = TwitterProcessor(eval(res[0]), res[0], None, session, options.token_filename) + processor.process() + session.commit() + logger.debug("main : %d tweet processed" % (i+1)) #@UndefinedVariable + except Exception, e: + session.rollback() + raise e + finally: + session.close() + + + \ No newline at end of file diff -r 7fb5a7b0d35c -r 99215db3da25 script/utils/tweet_twitter_user.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/utils/tweet_twitter_user.py Tue Feb 21 10:47:06 2012 +0100 @@ -0,0 +1,126 @@ +from iri_tweet.models import setup_database, Message, UserMessage, User +from iri_tweet.utils import (get_oauth_token, get_user_query, set_logging_options, + set_logging, parse_date, get_logger) +from optparse import OptionParser #@UnresolvedImport +from sqlalchemy import BigInteger +from sqlalchemy.schema import Table, Column +from sqlalchemy.sql import and_ +import datetime +import re +import sys +import twitter + +APPLICATION_NAME = "Tweet recorder user" +CONSUMER_KEY = "Vdr5ZcsjI1G3esTPI8yDg" +CONSUMER_SECRET = "LMhNrY99R6a7E0YbZZkRFpUZpX5EfB1qATbDk1sIVLs" + + +def get_options(): + parser = OptionParser() + parser.add_option("-d", "--database", dest="database", + help="Input database", metavar="DATABASE") + parser.add_option("-s", "--start-date", dest="start_date", + help="start date", metavar="START_DATE", default=None) + parser.add_option("-e", "--end-date", dest="end_date", + help="end date", metavar="END_DATE") + parser.add_option("-H", "--hashtag", dest="hashtag", + help="Hashtag", metavar="HASHTAG", default=[], action="append") + parser.add_option("-x", "--exclude", dest="exclude", + help="file containing the id to exclude", metavar="EXCLUDE") + parser.add_option("-D", "--duration", dest="duration", type="int", + help="Duration", metavar="DURATION", default=None) + parser.add_option("-m", "--message", dest="message", + help="tweet", metavar="MESSAGE", default="") + parser.add_option("-u", "--user", dest="user", + help="user", metavar="USER") + parser.add_option("-w", "--password", dest="password", + help="password", metavar="PASSWORD") + parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", + help="Token file name") + parser.add_option("-S", dest="simulate", metavar="SIMULATE", default=False, action="store_true", help="Simulate call to twitter. Do not change the database") + parser.add_option("-f", dest="force", metavar="FORCE", default=False, action="store_true", help="force sending message to all user even if it has already been sent") + + + set_logging_options(parser) + + return parser.parse_args() + + +if __name__ == "__main__": + + (options, args) = get_options() + + set_logging(options) + + get_logger().debug("OPTIONS : " + repr(options)) #@UndefinedVariable + + if not options.message or len(options.message) == 0: + get_logger().warning("No message exiting") + sys.exit() + + conn_str = options.database.strip() + if not re.match("^\w+://.+", conn_str): + conn_str = 'sqlite:///' + conn_str + + engine, metadata, Session = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False) + + conn = None + try : + conn = engine.connect() + session = None + try: + session = Session(bind=conn, autoflush=True, autocommit=True) + tweet_exclude_table = Table("tweet_exclude", metadata, Column('id', BigInteger, primary_key=True), prefixes=['TEMPORARY']) + metadata.create_all(bind=conn,tables=[tweet_exclude_table]) + + start_date_str = options.start_date + end_date_str = options.end_date + duration = options.duration + hashtags = options.hashtag + + start_date = None + if start_date_str: + start_date = parse_date(start_date_str) + + end_date = None + if end_date_str: + end_date = parse_date(end_date_str) + elif start_date and duration: + end_date = start_date + datetime.timedelta(seconds=duration) + + base_message = options.message.decode(sys.getfilesystemencoding()) + #get or create message + message_obj = session.query(Message).filter(Message.text == base_message).first() + if not message_obj : + message_obj = Message(text=base_message) + session.add(message_obj) + session.flush() + + query = get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table) + + if not options.force: + query = query.outerjoin(UserMessage, and_(User.id == UserMessage.user_id, UserMessage.message_id == message_obj.id)).filter(UserMessage.message_id == None) + + query_res = query.all() + + acess_token_key, access_token_secret = get_oauth_token(options.token_filename, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET) + t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET)) + + for user in query_res: + screen_name = user.screen_name + + message = u"@%s: %s" % (screen_name, base_message) + get_logger().debug("new status : " + message) #@UndefinedVariable + if not options.simulate: + t.statuses.update(status=message) + user_message = UserMessage(user_id=user.id, message_id=message_obj.id) + session.add(user_message) + session.flush() + finally: + # if message created and simulate, do not + if session: + session.close() + finally: + if conn: + conn.close() +