correct tweetstream version and clean iri_tweet. can now do a setup
--- a/.hgignore Mon Feb 20 18:52:19 2012 +0100
+++ b/.hgignore Tue Feb 21 10:47:06 2012 +0100
@@ -46,4 +46,12 @@
syntax: regexp
^tweetcast/nodejs/node_modules$
syntax: regexp
-^tweetcast/server-gevent/server_setup\.py$
\ No newline at end of file
+^tweetcast/server-gevent/server_setup\.py$
+syntax: regexp
+^script/lib/iri_tweet/dist$
+syntax: regexp
+^script/lib/iri_tweet/iri_tweet\.egg-info$
+syntax: regexp
+^script/lib/tweetstream/dist$
+syntax: regexp
+^script/lib/tweetstream/tweetstream\.egg-info$
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/iri_tweet/LICENSE Tue Feb 21 10:47:06 2012 +0100
@@ -0,0 +1,29 @@
+Copyright (c) 2012, IRI, Haussonne Yves-Marie
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+
+Neither the name of Rune Halvorsen nor the names of its contributors may be
+used to endorse or promote products derived from this software without
+specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
+BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
+
+CECILL-C
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/iri_tweet/README Tue Feb 21 10:47:06 2012 +0100
@@ -0,0 +1,33 @@
+.. -*- restructuredtext -*-
+
+
+##########################################
+iri_tweet - Simple twitter recording API
+##########################################
+
+Introduction
+------------
+
+
+Examples
+--------
+
+Contact
+-------
+
+The author is Yves-Marie Haussonne <ymh_work__at__gmail.com>. The project resides at
+http://bitbucket.org/ . If you find bugs, or have feature
+requests, please report them in the project site issue tracker. Patches are
+also very welcome.
+
+Contributors
+------------
+
+- Yves-Marie Haussonne
+
+License
+-------
+
+This software is licensed under the ``New BSD License`` and CECILL-C. See the ``LICENCE``
+file in the top distribution directory for the full license text.
+
--- a/script/lib/iri_tweet/__init__.py Mon Feb 20 18:52:19 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,17 +0,0 @@
-VERSION = (0, 82, 0, "final", 0)
-
-VERSION_STR = unicode(".".join(map(lambda i:"%02d" % (i,), VERSION[:2])))
-
-
-def get_version():
- version = '%s.%s' % (VERSION[0], VERSION[1])
- if VERSION[2]:
- version = '%s.%s' % (version, VERSION[2])
- if VERSION[3:] == ('alpha', 0):
- version = '%s pre-alpha' % version
- else:
- if VERSION[3] != 'final':
- version = '%s %s %s' % (version, VERSION[3], VERSION[4])
- return version
-
-__version__ = get_version()
--- a/script/lib/iri_tweet/export_tweet_db.py Mon Feb 20 18:52:19 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,47 +0,0 @@
-from models import setup_database
-from optparse import OptionParser #@UnresolvedImport
-from sqlalchemy.orm import sessionmaker
-from utils import set_logging_options, set_logging, TwitterProcessor, logger
-import sqlite3 #@UnresolvedImport
-
-
-# 'entities': "tweet_entity",
-# 'user': "tweet_user"
-
-def get_option():
-
- parser = OptionParser()
-
- parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
- help="Token file name")
-
- set_logging_options(parser)
-
- return parser.parse_args()
-
-if __name__ == "__main__":
-
- (options, args) = get_option()
-
- set_logging(options)
-
- with sqlite3.connect(args[0]) as conn_in:
- engine, metadata, Session = setup_database('sqlite:///'+args[1], echo=((options.verbose-options.quiet)>0))
- session = Session()
- try:
- curs_in = conn_in.cursor()
- fields_mapping = {}
- for i,res in enumerate(curs_in.execute("select json from tweet_tweet;")):
- logger.debug("main loop %d : %s" % (i, res[0])) #@UndefinedVariable
- processor = TwitterProcessor(eval(res[0]), res[0], None, session, options.token_filename)
- processor.process()
- session.commit()
- logger.debug("main : %d tweet processed" % (i+1)) #@UndefinedVariable
- except Exception, e:
- session.rollback()
- raise e
- finally:
- session.close()
-
-
-
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/iri_tweet/iri_tweet/__init__.py Tue Feb 21 10:47:06 2012 +0100
@@ -0,0 +1,23 @@
+"""Simple model end tools to record tweets"""
+
+VERSION = (0, 82, 0, "final", 0)
+
+VERSION_STR = unicode(".".join(map(lambda i:"%02d" % (i,), VERSION[:2])))
+
+
+def get_version():
+ version = '%s.%s' % (VERSION[0], VERSION[1])
+ if VERSION[2]:
+ version = '%s.%s' % (version, VERSION[2])
+ if VERSION[3:] == ('alpha', 0):
+ version = '%s pre-alpha' % version
+ else:
+ if VERSION[3] != 'final':
+ version = '%s %s %s' % (version, VERSION[3], VERSION[4])
+ return version
+
+__version__ = get_version()
+__author__ = "Yves-Marie Haussonne"
+__contact__ = "ymh.work@gmail.com"
+__homepage__ = ""
+__docformat__ = "restructuredtext"
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/iri_tweet/iri_tweet/models.py Tue Feb 21 10:47:06 2012 +0100
@@ -0,0 +1,290 @@
+from sqlalchemy import (Boolean, Column, Enum, BigInteger, Integer, String,
+ ForeignKey, DateTime, create_engine)
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import relationship, sessionmaker
+import anyjson
+import datetime
+import email.utils
+import iri_tweet
+
+
+Base = declarative_base()
+
+APPLICATION_NAME = "IRI_TWITTER"
+CONSUMER_KEY = "54ThDZhpEjokcMgHJOMnQA"
+CONSUMER_SECRET = "wUoL9UL2T87tfc97R0Dff2EaqRzpJ5XGdmaN2XK3udA"
+ACCESS_TOKEN_KEY = None
+ACCESS_TOKEN_SECRET = None
+#ACCESS_TOKEN_KEY= "47312923-LiNTtz0I18YXMVIrFeTuhmH7bOvYsK6p3Ln2Dc"
+#ACCESS_TOKEN_SECRET = "r3LoXVcjImNAElUpWqTu2SG2xCdWFHkva7xeQoncA"
+
+def adapt_date(date_str):
+ ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable
+ return datetime.datetime(*ts[0:7])
+
+def adapt_json(obj):
+ if obj is None:
+ return None
+ else:
+ return anyjson.serialize(obj)
+
+class TweetMeta(type(Base)):
+
+ def __init__(cls, name, bases, ns): #@NoSelf
+ def init(self, **kwargs):
+ for key, value in kwargs.items():
+ if hasattr(self, key):
+ setattr(self, key, value)
+ super(cls, self).__init__()
+ setattr(cls, '__init__', init)
+ super(TweetMeta, cls).__init__(name, bases, ns)
+
+
+class ProcessEvent(Base):
+ __metaclass__ = TweetMeta
+ __tablename__ = "tweet_process_event"
+ id = Column(Integer, primary_key=True, autoincrement=True)
+ ts = Column(DateTime, default=datetime.datetime.utcnow, index=True)
+ type = Column(Enum("start","pid","shutdown","error", "start_worker", "stop_worker", "model_version", "application_name", "application_version", name="process_event_type_enum"), nullable=False)
+ args = Column(String)
+
+class EntityType(Base):
+ __metaclass__ = TweetMeta
+ __tablename__ = "tweet_entity_type"
+ id = Column(Integer, primary_key=True, autoincrement=True)
+ label = Column(String)
+
+class Entity(Base):
+ __metaclass__ = TweetMeta
+ __tablename__ = "tweet_entity"
+ id = Column(Integer, primary_key=True)
+ tweet_id = Column(BigInteger, ForeignKey('tweet_tweet.id'))
+ type = Column(String)
+ entity_type_id = Column(Integer, ForeignKey('tweet_entity_type.id'), nullable=False)
+ entity_type = relationship("EntityType", backref="entities")
+ indice_start = Column(Integer)
+ indice_end = Column(Integer)
+ source = Column(String)
+ __mapper_args__ = {'polymorphic_on': type, 'polymorphic_identity': 'entity_entity', 'with_polymorphic':'*'}
+
+
+class TweetSource(Base):
+ __metaclass__ = TweetMeta
+ __tablename__ = 'tweet_tweet_source'
+ id = Column(Integer, primary_key=True, autoincrement=True)
+ original_json = Column(String)
+ received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True)
+
+
+class TweetLog(Base):
+
+ TWEET_STATUS = {
+ 'OK' : 1,
+ 'ERROR' : 2,
+ 'NOT_TWEET': 3,
+ }
+ __metaclass__ = TweetMeta
+
+ __tablename__ = 'tweet_tweet_log'
+ id = Column(Integer, primary_key=True, autoincrement=True)
+ ts = Column(DateTime, default=datetime.datetime.utcnow, index=True)
+ tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
+ tweet_source = relationship("TweetSource", backref="logs")
+ status = Column(Integer)
+ error = Column(String)
+ error_stack = Column(String)
+
+
+class Tweet(Base):
+ __metaclass__ = TweetMeta
+ __tablename__ = 'tweet_tweet'
+
+ id = Column(BigInteger, primary_key=True, autoincrement=False)
+ id_str = Column(String)
+ contributors = Column(String)
+ coordinates = Column(String)
+ created_at = Column(DateTime, index=True)
+ favorited = Column(Boolean)
+ geo = Column(String)
+ in_reply_to_screen_name = Column(String)
+ in_reply_to_status_id = Column(BigInteger)
+ in_reply_to_status_id_str = Column(String)
+ in_reply_to_user_id = Column(BigInteger)
+ in_reply_to_user_id_str = Column(String)
+ place = Column(String)
+ retweet_count = Column(String)
+ retweeted = Column(Boolean)
+ source = Column(String)
+ text = Column(String)
+ truncated = Column(Boolean)
+ user_id = Column(Integer, ForeignKey('tweet_user.id'))
+ user = relationship("User", backref="tweets")
+ tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
+ tweet_source = relationship("TweetSource", backref="tweet")
+ entity_list = relationship(Entity, backref='tweet')
+ received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True)
+
+
+class UserMessage(Base):
+ __metaclass__ = TweetMeta
+ __tablename__ = "tweet_user_message"
+
+ id = Column(Integer, primary_key=True)
+ user_id = Column(Integer, ForeignKey('tweet_user.id'))
+ user = relationship("User", backref="messages")
+ created_at = Column(DateTime, default=datetime.datetime.utcnow)
+ message_id = Column(Integer, ForeignKey('tweet_message.id'))
+
+class Message(Base):
+ __metaclass__ = TweetMeta
+ __tablename__ = "tweet_message"
+
+ id = Column(Integer, primary_key=True)
+ created_at = Column(DateTime, default=datetime.datetime.utcnow)
+ text = Column(String)
+ users = relationship(UserMessage, backref='message')
+
+
+class User(Base):
+ __metaclass__ = TweetMeta
+ __tablename__ = "tweet_user"
+
+ id = Column(BigInteger, primary_key=True, autoincrement=False)
+ id_str = Column(String)
+ contributors_enabled = Column(Boolean)
+ created_at = Column(DateTime, index=True)
+ description = Column(String)
+ favourites_count = Column(Integer)
+ follow_request_sent = Column(Boolean)
+ followers_count = Column(Integer)
+ following = Column(String)
+ friends_count = Column(Integer)
+ geo_enabled = Column(Boolean)
+ is_translator = Column(Boolean)
+ lang = Column(String)
+ listed_count = Column(Integer)
+ location = Column(String)
+ name = Column(String)
+ notifications = Column(String)
+ profile_background_color = Column(String)
+ profile_background_image_url = Column(String)
+ profile_background_tile = Column(Boolean)
+ profile_image_url = Column(String)
+ profile_image_url_https = Column(String)
+ profile_link_color = Column(String)
+ profile_sidebar_border_color = Column(String)
+ profile_sidebar_fill_color = Column(String)
+ profile_text_color = Column(String)
+ default_profile_image = Column(String)
+ profile_use_background_image = Column(Boolean)
+ protected = Column(Boolean)
+ screen_name = Column(String, index=True)
+ show_all_inline_media = Column(Boolean)
+ statuses_count = Column(Integer)
+ time_zone = Column(String)
+ url = Column(String)
+ utc_offset = Column(Integer)
+ verified = Column(Boolean)
+
+
+class Hashtag(Base):
+ __metaclass__ = TweetMeta
+ __tablename__ = "tweet_hashtag"
+ id = Column(Integer, primary_key=True)
+ text = Column(String, unique=True, index=True)
+
+
+class Url(Base):
+ __metaclass__ = TweetMeta
+ __tablename__ = "tweet_url"
+ id = Column(Integer, primary_key=True)
+ url = Column(String, unique=True)
+ expanded_url = Column(String)
+
+
+class MediaType(Base):
+ __metaclass__ = TweetMeta
+ __tablename__ = "tweet_media_type"
+ id = Column(Integer, primary_key=True, autoincrement=True)
+ label = Column(String, unique=True, index=True)
+
+
+
+class Media(Base):
+ __metaclass__ = TweetMeta
+ __tablename__ = "tweet_media"
+ id = Column(BigInteger, primary_key=True, autoincrement=False)
+ id_str = Column(String, unique=True)
+ media_url = Column(String, unique=True)
+ media_url_https = Column(String, unique=True)
+ url = Column(String)
+ display_url = Column(String)
+ expanded_url = Column(String)
+ sizes = Column(String)
+ type_id = Column(Integer, ForeignKey("tweet_media_type.id"))
+ type = relationship(MediaType, primaryjoin=type_id == MediaType.id)
+
+
+
+class EntityHashtag(Entity):
+ __tablename__ = "tweet_entity_hashtag"
+ __mapper_args__ = {'polymorphic_identity': 'entity_hashtag'}
+ id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
+ hashtag_id = Column(Integer, ForeignKey("tweet_hashtag.id"))
+ hashtag = relationship(Hashtag, primaryjoin=hashtag_id == Hashtag.id)
+
+
+class EntityUrl(Entity):
+ __tablename__ = "tweet_entity_url"
+ __mapper_args__ = {'polymorphic_identity': 'entity_url'}
+ id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
+ url_id = Column(Integer, ForeignKey("tweet_url.id"))
+ url = relationship(Url, primaryjoin=url_id == Url.id)
+
+class EntityUser(Entity):
+ __tablename__ = "tweet_entity_user"
+ __mapper_args__ = {'polymorphic_identity': 'entity_user'}
+ id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
+ user_id = Column(BigInteger, ForeignKey('tweet_user.id'))
+ user = relationship(User, primaryjoin=(user_id == User.id))
+
+
+class EntityMedia(Entity):
+ __tablename__ = "tweet_entity_media"
+ __mapper_args__ = {'polymorphic_identity': 'entity_media'}
+ id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
+ media_id = Column(BigInteger, ForeignKey('tweet_media.id'))
+ media = relationship(Media, primaryjoin=(media_id == Media.id))
+
+def add_model_version(session, must_commit=True):
+ pe = ProcessEvent(args=iri_tweet.get_version(), type="model_version")
+ session.add(pe)
+ if must_commit:
+ session.commit()
+
+def setup_database(*args, **kwargs):
+
+ session_argname = [ 'autoflush','binds', "class_", "_enable_transaction_accounting","expire_on_commit", "extension", "query_cls", "twophase", "weak_identity_map", "autocommit"]
+
+ kwargs_ce = dict((k, v) for k,v in kwargs.items() if (k not in session_argname and k != "create_all"))
+
+ engine = create_engine(*args, **kwargs_ce)
+ metadata = Base.metadata
+
+ kwargs_sm = {'bind': engine}
+
+ kwargs_sm.update([(argname, kwargs[argname]) for argname in session_argname if argname in kwargs])
+
+ Session = sessionmaker(**kwargs_sm)
+ #set model version
+
+ if kwargs.get('create_all', True):
+ metadata.create_all(engine)
+ session = Session()
+ try:
+ add_model_version(session)
+ finally:
+ session.close()
+
+ return (engine, metadata, Session)
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/iri_tweet/iri_tweet/tests.py Tue Feb 21 10:47:06 2012 +0100
@@ -0,0 +1,197 @@
+from sqlalchemy import Column, Integer, String, ForeignKey, create_engine
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import relationship, backref
+import unittest #@UnresolvedImport
+from sqlalchemy.orm import sessionmaker
+from iri_tweet.utils import ObjectsBuffer, TwitterProcessor
+from iri_tweet import models
+import tempfile #@UnresolvedImport
+import os
+
+Base = declarative_base()
+
+class User(Base):
+ __tablename__ = 'users'
+
+ id = Column(Integer, primary_key=True)
+ name = Column(String)
+ fullname = Column(String)
+ password = Column(String)
+
+ def __init__(self, name, fullname, password):
+ self.name = name
+ self.fullname = fullname
+ self.password = password
+
+ def __repr__(self):
+ return "<User('%s','%s', '%s')>" % (self.name, self.fullname, self.password)
+
+
+class Address(Base):
+ __tablename__ = 'addresses'
+ id = Column(Integer, primary_key=True)
+ email_address = Column(String, nullable=False)
+ user_id = Column(Integer, ForeignKey('users.id'))
+
+ user = relationship("User", backref=backref('addresses', order_by=id))
+
+ def __init__(self, user_id, email_address):
+ self.email_address = email_address
+ self.user_id = user_id
+
+ def __repr__(self):
+ return "<Address('%s')>" % self.email_address
+
+
+
+class TestObjectBuffer(unittest.TestCase):
+
+ def setUp(self):
+ self.engine = create_engine('sqlite:///:memory:', echo=False)
+ Base.metadata.create_all(self.engine)
+ sessionMaker = sessionmaker(bind=self.engine)
+ self.session = sessionMaker()
+
+ def tearDown(self):
+ self.session.close()
+ self.engine.dispose()
+
+
+ def testCreateUser(self):
+ ed_user = User('ed', 'Ed Jones', 'edspassword')
+ self.session.add(ed_user)
+ self.assertTrue(ed_user.id is None)
+ self.session.commit()
+ self.assertTrue(ed_user.id is not None)
+
+
+ def testSimpleBuffer(self):
+ obj_buffer = ObjectsBuffer()
+ obj_proxy = obj_buffer.add_object(User, ['ed1', 'Ed1 Jones', 'edspassword'], None, False)
+ self.assertTrue(obj_proxy.id() is None)
+ obj_buffer.persists(self.session)
+ self.assertTrue(obj_proxy.id() is None)
+ self.session.commit()
+ self.assertTrue(obj_proxy.id() is not None)
+
+
+ def testSimpleBufferKwargs(self):
+ obj_buffer = ObjectsBuffer()
+ obj_proxy = obj_buffer.add_object(User, None, {'name':'ed1b', 'fullname':'Ed1b Jones', 'password':'edspassword'}, False)
+ self.assertTrue(obj_proxy.id() is None)
+ obj_buffer.persists(self.session)
+ self.assertTrue(obj_proxy.id() is None)
+ self.session.commit()
+ self.assertTrue(obj_proxy.id() is not None)
+
+
+ def testSimpleBufferFlush(self):
+ obj_buffer = ObjectsBuffer()
+ obj_proxy = obj_buffer.add_object(User, ['ed2', 'Ed2 Jones', 'edspassword'], None, True)
+ self.assertTrue(obj_proxy.id() is None)
+ obj_buffer.persists(self.session)
+ self.assertTrue(obj_proxy.id() is not None)
+ self.session.commit()
+ self.assertTrue(obj_proxy.id() is not None)
+
+ def testRelationBuffer(self):
+ obj_buffer = ObjectsBuffer()
+ user1_proxy = obj_buffer.add_object(User, ['ed3', 'Ed3 Jones', 'edspassword'], None, True)
+ obj_buffer.add_object(Address, [user1_proxy.id,'ed3@mail.com'], None, False)
+ obj_buffer.add_object(Address, [user1_proxy.id,'ed3@other.com'], None, False)
+ user2_proxy = obj_buffer.add_object(User, ['ed4', 'Ed3 Jones', 'edspassword'], None, True)
+ obj_buffer.add_object(Address, [user2_proxy.id,'ed4@mail.com'], None, False)
+ obj_buffer.persists(self.session)
+ self.session.commit()
+ ed_user = self.session.query(User).filter_by(name='ed3').first()
+ self.assertEquals(2, len(ed_user.addresses))
+ ed_user = self.session.query(User).filter_by(name='ed4').first()
+ self.assertEquals(1, len(ed_user.addresses))
+
+
+ def testGet(self):
+ obj_buffer = ObjectsBuffer()
+ user1_proxy = obj_buffer.add_object(User, None, {'name':'ed2', 'fullname':'Ed2 Jones', 'password':'edspassword'}, True)
+ adress_proxy = obj_buffer.add_object(Address, None, {'user_id':user1_proxy.id,'email_address':'ed2@other.com'}, False)
+ user2_proxy = obj_buffer.add_object(User, None, {'name':'ed3', 'fullname':'Ed3 Jones', 'password':'edspassword'}, True)
+ obj_buffer.add_object(Address, None, {'user_id':user2_proxy.id,'email_address':'ed3@other.com'}, False)
+ self.assertEquals(user1_proxy, obj_buffer.get(User, name='ed2'))
+ self.assertEquals(adress_proxy, obj_buffer.get(Address, email_address='ed2@other.com'))
+ self.assertEquals(user2_proxy, obj_buffer.get(User, name='ed3'))
+ self.assertTrue(obj_buffer.get(User, name='ed3', fullname='Ed2 Jones') is None)
+
+original_json = u'{"in_reply_to_user_id_str":null,"text":"RT @BieberEagle: \\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweeted_status":{"in_reply_to_user_id_str":null,"text":"\\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweet_count":"100+","source":"web","entities":{"user_mentions":[],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96638597737889792","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/298443445\/355584171.jpg","listed_count":5040,"friends_count":8477,"profile_link_color":"ff0000","profile_sidebar_border_color":"000000","url":"http:\/\/twitpic.com\/photos\/BieberEagle","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","profile_image_url":"http:\/\/a2.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","description":"1 name, 1 inspiration, 1 hero, 1 smile, 1 singer, 1 boy who changed my life. B.\u0130.E.B.E.R-Believe In Everything Because Everything\'s Reachable. #NEVERSAYNEVER","default_profile":false,"notifications":null,"time_zone":"Paris","followers_count":14506,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"BieberEagle","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ffffff","location":"\u2665 Albania \u2665 ","id_str":"229067923","profile_background_image_url":"http:\/\/a2.twimg.com\/profile_background_images\/298443445\/355584171.jpg","favourites_count":89,"protected":false,"follow_request_sent":null,"following":null,"name":"truebelieber","statuses_count":24279,"verified":false,"created_at":"Tue Dec 21 12:35:18 +0000 2010","profile_text_color":"000000","id":229067923,"contributors_enabled":false,"utc_offset":3600,"profile_sidebar_fill_color":""},"id":96638597737889792,"created_at":"Thu Jul 28 17:50:11 +0000 2011","geo":null,"in_reply_to_screen_name":null},"retweet_count":"100+","source":"web","entities":{"user_mentions":[{"indices":[3,15],"screen_name":"BieberEagle","id_str":"229067923","name":"truebelieber","id":229067923}],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96965037637382145","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","listed_count":3,"friends_count":1150,"profile_link_color":"00cccc","profile_sidebar_border_color":"c8ff00","url":"http:\/\/www.facebook.com\/blovedbecca180","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","profile_image_url":"http:\/\/a3.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","description":"if ya wanna know something about me, then get to know me. \\n\\r\\n\\ri promise, you wont regret it. (:\\r\\ni love justin bieber with an extreme burning passion!","default_profile":false,"notifications":null,"time_zone":"Central Time (US & Canada)","followers_count":361,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"beccaxannxx","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ff0066","location":"","id_str":"65624607","profile_background_image_url":"http:\/\/a3.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","favourites_count":266,"protected":false,"follow_request_sent":null,"following":null,"name":"beccaxannxx","statuses_count":2512,"verified":false,"created_at":"Fri Aug 14 12:36:35 +0000 2009","profile_text_color":"6a39d4","id":65624607,"contributors_enabled":false,"utc_offset":-21600,"profile_sidebar_fill_color":"ff00bb"},"id":96965037637382145,"created_at":"Fri Jul 29 15:27:21 +0000 2011","geo":null,"in_reply_to_screen_name":null}'
+original_json_media = u'{"user": {"follow_request_sent": null, "profile_use_background_image": true, "id": 34311537, "verified": false, "profile_image_url_https": "https://si0.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "profile_sidebar_fill_color": "DAECF4", "is_translator": false, "geo_enabled": false, "profile_text_color": "663B12", "followers_count": 29, "profile_sidebar_border_color": "C6E2EE", "id_str": "34311537", "default_profile_image": false, "location": "", "utc_offset": -25200, "statuses_count": 813, "description": "i like joe jonas, justin bieber, ashley tisdale, selena gomez, megen fox, kim kardashian and demi lovoto and many more.", "friends_count": 101, "profile_link_color": "1F98C7", "profile_image_url": "http://a1.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "notifications": null, "show_all_inline_media": false, "profile_background_image_url_https": "https://si0.twimg.com/profile_background_images/298244445/tour2011.jpg", "profile_background_color": "C6E2EE", "profile_background_image_url": "http://a1.twimg.com/profile_background_images/298244445/tour2011.jpg", "name": "mikayla", "lang": "en", "profile_background_tile": true, "favourites_count": 231, "screen_name": "bieberfever17ya", "url": null, "created_at": "Wed Apr 22 16:04:28 +0000 2009", "contributors_enabled": false, "time_zone": "Mountain Time (US & Canada)", "protected": false, "default_profile": false, "following": null, "listed_count": 1}, "favorited": false, "entities": {"user_mentions": [], "media": [{"media_url_https": "https://p.twimg.com/AWea5Z-CQAAvfvK.jpg", "expanded_url": "http://twitter.com/bieberfever17ya/status/101219827649232896/photo/1", "sizes": {"small": {"h": 240, "w": 201, "resize": "fit"}, "large": {"h": 240, "w": 201, "resize": "fit"}, "medium": {"h": 240, "w": 201, "resize": "fit"}, "thumb": {"h": 150, "w": 150, "resize": "crop"}}, "url": "http://t.co/N7yZ8hS", "display_url": "pic.twitter.com/N7yZ8hS", "id_str": "101219827653427200", "indices": [31, 50], "type": "photo", "id": 101219827653427200, "media_url": "http://p.twimg.com/AWea5Z-CQAAvfvK.jpg"}], "hashtags": [], "urls": []}, "contributors": null, "truncated": false, "text": "i love you justin bieber <3 http://t.co/N7yZ8hS", "created_at": "Wed Aug 10 09:14:22 +0000 2011", "retweeted": false, "in_reply_to_status_id": null, "coordinates": null, "id": 101219827649232896, "source": "web", "in_reply_to_status_id_str": null, "place": null, "in_reply_to_user_id": null, "in_reply_to_screen_name": null, "retweet_count": 0, "geo": null, "in_reply_to_user_id_str": null, "possibly_sensitive": false, "id_str": "101219827649232896"}'
+original_json_media_others = u'{"user": {"utc_offset": -25200, "statuses_count": 813, "default_profile_image": false, "friends_count": 101, "profile_background_image_url_https": "https://si0.twimg.com/profile_background_images/298244445/tour2011.jpg", "profile_use_background_image": true, "profile_sidebar_fill_color": "DAECF4", "profile_link_color": "1F98C7", "profile_image_url": "http://a1.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "time_zone": "Mountain Time (US & Canada)", "is_translator": false, "screen_name": "bieberfever17ya", "url": null, "show_all_inline_media": false, "geo_enabled": false, "profile_background_color": "C6E2EE", "id": 34311537, "profile_background_image_url": "http://a1.twimg.com/profile_background_images/298244445/tour2011.jpg", "description": "i like joe jonas, justin bieber, ashley tisdale, selena gomez, megen fox, kim kardashian and demi lovoto and many more.", "lang": "en", "profile_background_tile": true, "favourites_count": 231, "name": "mikayla", "notifications": null, "follow_request_sent": null, "created_at": "Wed Apr 22 16:04:28 +0000 2009", "verified": false, "contributors_enabled": false, "location": "", "profile_text_color": "663B12", "followers_count": 29, "profile_sidebar_border_color": "C6E2EE", "id_str": "34311537", "default_profile": false, "following": null, "protected": false, "profile_image_url_https": "https://si0.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "listed_count": 1}, "favorited": false, "contributors": null, "source": "web", "text": "i love you justin bieber <3 http://t.co/N7yZ8hS", "created_at": "Wed Aug 10 09:14:22 +0000 2011", "truncated": false, "retweeted": false, "in_reply_to_status_id_str": null, "coordinates": null, "in_reply_to_user_id_str": null, "entities": {"user_mentions": [], "media": [], "hashtags": [], "urls": [], "others": [{"url": "http://t.co/N7yZ8hS", "text": "comments", "indices": [31, 50]}]}, "in_reply_to_status_id": null, "in_reply_to_screen_name": null, "id_str": "101219827649232896", "place": null, "retweet_count": 0, "geo": null, "id": 101219827649232896, "possibly_sensitive": false, "in_reply_to_user_id": null}'
+
+class TestTwitterProcessor(unittest.TestCase):
+
+ def setUp(self):
+ self.engine, self.metadata, sessionMaker = models.setup_database('sqlite:///:memory:', echo=True)
+ self.session = sessionMaker()
+ file, self.tmpfilepath = tempfile.mkstemp()
+ os.close(file)
+
+
+ def testTwitterProcessor(self):
+ tp = TwitterProcessor(None, original_json, None, self.session, self.tmpfilepath)
+ tp.process()
+ self.session.commit()
+
+ self.assertEquals(1, self.session.query(models.TweetSource).count())
+ self.assertEquals(1, self.session.query(models.Tweet).count())
+ self.assertEquals(2, self.session.query(models.User).count())
+ tweet = self.session.query(models.Tweet).first()
+ self.assertFalse(tweet.user is None)
+ self.assertEqual(u"beccaxannxx",tweet.user.name)
+ self.assertEqual(65624607,tweet.user.id)
+ self.assertEqual(1,len(tweet.entity_list))
+ entity = tweet.entity_list[0]
+ self.assertEqual(u"BieberEagle", entity.user.screen_name)
+ self.assertTrue(entity.user.created_at is None)
+ self.assertEqual("entity_user", entity.type)
+ self.assertEqual("user_mentions", entity.entity_type.label)
+
+
+ def testTwitterProcessorMedia(self):
+ tp = TwitterProcessor(None, original_json_media, None, self.session, self.tmpfilepath)
+ tp.process()
+ self.session.commit()
+
+ self.assertEquals(1, self.session.query(models.TweetSource).count())
+ self.assertEquals(1, self.session.query(models.Tweet).count())
+ self.assertEquals(1, self.session.query(models.User).count())
+ tweet = self.session.query(models.Tweet).first()
+ self.assertFalse(tweet.user is None)
+ self.assertEqual(u"mikayla",tweet.user.name)
+ self.assertEqual(34311537,tweet.user.id)
+ self.assertEqual(1,len(tweet.entity_list))
+ entity = tweet.entity_list[0]
+ self.assertEqual(101219827653427200, entity.media.id)
+ self.assertEqual("photo", entity.media.type.label)
+ self.assertEqual("entity_media", entity.type)
+ self.assertEqual("media", entity.entity_type.label)
+
+
+ def testTwitterProcessorMediaOthers(self):
+ tp = TwitterProcessor(None, original_json_media_others, None, self.session, self.tmpfilepath)
+ tp.process()
+ self.session.commit()
+
+ self.assertEquals(1, self.session.query(models.TweetSource).count())
+ self.assertEquals(1, self.session.query(models.Tweet).count())
+ tweet = self.session.query(models.Tweet).first()
+ self.assertEqual(1,len(tweet.entity_list))
+ entity = tweet.entity_list[0]
+ self.assertEqual("entity_entity", entity.type)
+ self.assertEqual("others", entity.entity_type.label)
+
+
+
+ def tearDown(self):
+ self.session.close()
+ self.engine.dispose()
+ os.remove(self.tmpfilepath)
+
+if __name__ == '__main__':
+ unittest.main()
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/iri_tweet/iri_tweet/utils.py Tue Feb 21 10:47:06 2012 +0100
@@ -0,0 +1,626 @@
+from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url,
+ EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY,
+ ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType,
+ Media, EntityMedia, Entity, EntityType)
+from sqlalchemy.sql import select, or_ #@UnresolvedImport
+import Queue #@UnresolvedImport
+import anyjson #@UnresolvedImport
+import codecs
+import datetime
+import email.utils
+import logging
+import math
+import os.path
+import sys
+import twitter.oauth #@UnresolvedImport
+import twitter.oauth_dance #@UnresolvedImport
+import twitter_text #@UnresolvedImport
+
+
+CACHE_ACCESS_TOKEN = {}
+
+def get_oauth_token(token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET):
+
+ global CACHE_ACCESS_TOKEN
+
+ if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
+ return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
+
+ res = CACHE_ACCESS_TOKEN.get(application_name, None)
+
+ if res is None and token_file_path and os.path.exists(token_file_path):
+ get_logger().debug("get_oauth_token : reading token from file %s" % token_file_path) #@UndefinedVariable
+ res = twitter.oauth.read_token_file(token_file_path)
+
+ if res is not None and check_access_token:
+ get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable
+ t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], CONSUMER_KEY, CONSUMER_SECRET))
+ status = None
+ try:
+ status = t.account.rate_limit_status()
+ except Exception as e:
+ get_logger().debug("get_oauth_token : error getting rate limit status %s" % repr(e))
+ status = None
+ get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable
+ if status is None or status['remaining_hits'] == 0:
+ get_logger().debug("get_oauth_token : Problem with status %s" % repr(status))
+ res = None
+
+ if res is None:
+ get_logger().debug("get_oauth_token : doing the oauth dance")
+ res = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
+
+ CACHE_ACCESS_TOKEN[application_name] = res
+
+ return res
+
+def parse_date(date_str):
+ ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable
+ return datetime.datetime(*ts[0:7])
+
+def clean_keys(dict_val):
+ return dict([(str(key),value) for key,value in dict_val.items()])
+
+fields_adapter = {
+ 'stream': {
+ "tweet": {
+ "created_at" : adapt_date,
+ "coordinates" : adapt_json,
+ "place" : adapt_json,
+ "geo" : adapt_json,
+# "original_json" : adapt_json,
+ },
+ "user": {
+ "created_at" : adapt_date,
+ },
+
+ },
+
+ 'entities' : {
+ "medias": {
+ "sizes" : adapt_json,
+ },
+ },
+ 'rest': {
+ "tweet" : {
+ "place" : adapt_json,
+ "geo" : adapt_json,
+ "created_at" : adapt_date,
+# "original_json" : adapt_json,
+ },
+ },
+}
+
+#
+# adapt fields, return a copy of the field_dict with adapted fields
+#
+def adapt_fields(fields_dict, adapter_mapping):
+ def adapt_one_field(field, value):
+ if field in adapter_mapping and adapter_mapping[field] is not None:
+ return adapter_mapping[field](value)
+ else:
+ return value
+ return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])
+
+
+class ObjectBufferProxy(object):
+ def __init__(self, klass, args, kwargs, must_flush, instance=None):
+ self.klass= klass
+ self.args = args
+ self.kwargs = kwargs
+ self.must_flush = must_flush
+ self.instance = instance
+
+ def persists(self, session):
+ new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
+ new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {}
+
+ if self.instance is None:
+ self.instance = self.klass(*new_args, **new_kwargs)
+ else:
+ self.instance = self.klass(*new_args, **new_kwargs)
+ self.instance = session.merge(self.instance)
+
+ session.add(self.instance)
+ if self.must_flush:
+ session.flush()
+
+ def __getattr__(self, name):
+ return lambda : getattr(self.instance, name) if self.instance else None
+
+
+
+
+class ObjectsBuffer(object):
+
+ def __init__(self):
+ self.__bufferlist = []
+ self.__bufferdict = {}
+
+ def __add_proxy_object(self, proxy):
+ proxy_list = self.__bufferdict.get(proxy.klass, None)
+ if proxy_list is None:
+ proxy_list = []
+ self.__bufferdict[proxy.klass] = proxy_list
+ proxy_list.append(proxy)
+ self.__bufferlist.append(proxy)
+
+ def persists(self, session):
+ for object_proxy in self.__bufferlist:
+ object_proxy.persists(session)
+
+ def add_object(self, klass, args, kwargs, must_flush, instance=None):
+ new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush, instance)
+ self.__add_proxy_object(new_proxy)
+ return new_proxy
+
+ def get(self, klass, **kwargs):
+ if klass in self.__bufferdict:
+ for proxy in self.__bufferdict[klass]:
+ if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass:
+ continue
+ found = True
+ for k,v in kwargs.items():
+ if (k not in proxy.kwargs) or v != proxy.kwargs[k]:
+ found = False
+ break
+ if found:
+ return proxy
+ return None
+
+class TwitterProcessorException(Exception):
+ pass
+
+class TwitterProcessor(object):
+
+ def __init__(self, json_dict, json_txt, source_id, session, access_token=None, token_filename=None, user_query_twitter=False):
+
+ if json_dict is None and json_txt is None:
+ raise TwitterProcessorException("No json")
+
+ if json_dict is None:
+ self.json_dict = anyjson.deserialize(json_txt)
+ else:
+ self.json_dict = json_dict
+
+ if not json_txt:
+ self.json_txt = anyjson.serialize(json_dict)
+ else:
+ self.json_txt = json_txt
+
+ if "id" not in self.json_dict:
+ raise TwitterProcessorException("No id in json")
+
+ self.source_id = source_id
+ self.session = session
+ self.token_filename = token_filename
+ self.access_token = access_token
+ self.obj_buffer = ObjectsBuffer()
+ self.user_query_twitter = user_query_twitter
+
+
+
+ def __get_user(self, user_dict, do_merge):
+ get_logger().debug("Get user : " + repr(user_dict)) #@UndefinedVariable
+
+ user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
+
+ user_id = user_dict.get("id",None)
+ user_name = user_dict.get("screen_name", user_dict.get("name", None))
+
+ if user_id is None and user_name is None:
+ return None
+
+ user = None
+ if user_id:
+ user = self.obj_buffer.get(User, id=user_id)
+ else:
+ user = self.obj_buffer.get(User, screen_name=user_name)
+
+ #to do update user id needed
+ if user is not None:
+ user_created_at = None
+ if user.args is not None:
+ user_created_at = user.args.get('created_at', None)
+ if user_created_at is None and user_dict.get('created_at', None) is not None and do_merge:
+ if user.args is None:
+ user.args = user_dict
+ else:
+ user.args.update(user_dict)
+ return user
+
+ #todo : add methpds to objectbuffer to get buffer user
+ user_obj = None
+ if user_id:
+ user_obj = self.session.query(User).filter(User.id == user_id).first()
+ else:
+ user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first()
+
+ #todo update user if needed
+ if user_obj is not None:
+ if user_obj.created_at is not None or user_dict.get('created_at', None) is None or not do_merge :
+ user = ObjectBufferProxy(User, None, None, False, user_obj)
+ else:
+ user = self.obj_buffer.add_object(User, None, user_dict, True, user_obj)
+ return user
+
+ user_created_at = user_dict.get("created_at", None)
+
+ if user_created_at is None and self.user_query_twitter:
+
+ if self.access_token is not None:
+ acess_token_key, access_token_secret = self.access_token
+ else:
+ acess_token_key, access_token_secret = get_oauth_token(self.token_filename)
+ t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET))
+ try:
+ if user_id:
+ user_dict = t.users.show(user_id=user_id)
+ else:
+ user_dict = t.users.show(screen_name=user_name)
+ except Exception as e:
+ get_logger().info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
+ get_logger().info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
+ return None
+
+ if "id" not in user_dict:
+ return None
+
+ #TODO filter get, wrap in proxy
+ user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first()
+
+ if user_obj is not None and not do_merge:
+ return ObjectBufferProxy(User, None, None, False, user_obj)
+ else:
+ return self.obj_buffer.add_object(User, None, user_dict, True)
+
+ def __get_or_create_object(self, klass, filter_by_kwargs, filter, creation_kwargs, must_flush, do_merge):
+
+ obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs)
+ if obj_proxy is None:
+ query = self.session.query(klass)
+ if filter is not None:
+ query = query.filter(filter)
+ else:
+ query = query.filter_by(**filter_by_kwargs)
+ obj_instance = query.first()
+ if obj_instance is not None:
+ if not do_merge:
+ obj_proxy = ObjectBufferProxy(klass, None, None, False, obj_instance)
+ else:
+ obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush, obj_instance)
+ if obj_proxy is None:
+ obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush)
+ return obj_proxy
+
+
+ def __process_entity(self, ind, ind_type):
+ get_logger().debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
+
+ ind = clean_keys(ind)
+
+ entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False)
+
+ entity_dict = {
+ "indice_start" : ind["indices"][0],
+ "indice_end" : ind["indices"][1],
+ "tweet_id" : self.tweet.id,
+ "entity_type_id" : entity_type.id,
+ "source" : adapt_json(ind)
+ }
+
+ def process_medias():
+
+ media_id = ind.get('id', None)
+ if media_id is None:
+ return None, None
+
+ type_str = ind.get("type", "photo")
+ media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False)
+ media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"])
+ if "type" in media_ind:
+ del(media_ind["type"])
+ media_ind['type_id'] = media_type.id
+ media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False)
+
+ entity_dict['media_id'] = media.id
+ return EntityMedia, entity_dict
+
+ def process_hashtags():
+ text = ind.get("text", ind.get("hashtag", None))
+ if text is None:
+ return None, None
+ ind['text'] = text
+ hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False)
+ entity_dict['hashtag_id'] = hashtag.id
+ return EntityHashtag, entity_dict
+
+ def process_user_mentions():
+ user_mention = self.__get_user(ind, False)
+ if user_mention is None:
+ entity_dict['user_id'] = None
+ else:
+ entity_dict['user_id'] = user_mention.id
+ return EntityUser, entity_dict
+
+ def process_urls():
+ url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False)
+ entity_dict['url_id'] = url.id
+ return EntityUrl, entity_dict
+
+ #{'': lambda }
+ entity_klass, entity_dict = {
+ 'hashtags': process_hashtags,
+ 'user_mentions' : process_user_mentions,
+ 'urls' : process_urls,
+ 'media': process_medias,
+ }.get(ind_type, lambda: (Entity, entity_dict))()
+
+ get_logger().debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
+ if entity_klass:
+ self.obj_buffer.add_object(entity_klass, None, entity_dict, False)
+
+
+ def __process_twitter_stream(self):
+
+ tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
+ if tweet_nb > 0:
+ return
+
+ ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
+
+ # get or create user
+ user = self.__get_user(self.json_dict["user"], True)
+ if user is None:
+ get_logger().warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
+ ts_copy["user_id"] = None
+ else:
+ ts_copy["user_id"] = user.id
+
+ del(ts_copy['user'])
+ ts_copy["tweet_source_id"] = self.source_id
+
+ self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True)
+
+ self.__process_entities()
+
+
+ def __process_entities(self):
+ if "entities" in self.json_dict:
+ for ind_type, entity_list in self.json_dict["entities"].items():
+ for ind in entity_list:
+ self.__process_entity(ind, ind_type)
+ else:
+
+ text = self.tweet.text
+ extractor = twitter_text.Extractor(text)
+ for ind in extractor.extract_hashtags_with_indices():
+ self.__process_entity(ind, "hashtags")
+
+ for ind in extractor.extract_urls_with_indices():
+ self.__process_entity(ind, "urls")
+
+ for ind in extractor.extract_mentioned_screen_names_with_indices():
+ self.__process_entity(ind, "user_mentions")
+
+ def __process_twitter_rest(self):
+ tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
+ if tweet_nb > 0:
+ return
+
+
+ tweet_fields = {
+ 'created_at': self.json_dict["created_at"],
+ 'favorited': False,
+ 'id': self.json_dict["id"],
+ 'id_str': self.json_dict["id_str"],
+ #'in_reply_to_screen_name': ts["to_user"],
+ 'in_reply_to_user_id': self.json_dict["to_user_id"],
+ 'in_reply_to_user_id_str': self.json_dict["to_user_id_str"],
+ #'place': ts["place"],
+ 'source': self.json_dict["source"],
+ 'text': self.json_dict["text"],
+ 'truncated': False,
+ 'tweet_source_id' : self.source_id,
+ }
+
+ #user
+
+ user_fields = {
+ 'lang' : self.json_dict.get('iso_language_code',None),
+ 'profile_image_url' : self.json_dict["profile_image_url"],
+ 'screen_name' : self.json_dict["from_user"],
+ 'id' : self.json_dict["from_user_id"],
+ 'id_str' : self.json_dict["from_user_id_str"],
+ 'name' : self.json_dict['from_user_name'],
+ }
+
+ user = self.__get_user(user_fields, do_merge=False)
+ if user is None:
+ get_logger().warning("USER not found " + repr(user_fields)) #@UndefinedVariable
+ tweet_fields["user_id"] = None
+ else:
+ tweet_fields["user_id"] = user.id
+
+ tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
+ self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True)
+
+ self.__process_entities()
+
+
+
+ def process(self):
+
+ if self.source_id is None:
+ tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True)
+ self.source_id = tweet_source.id
+
+ if "metadata" in self.json_dict:
+ self.__process_twitter_rest()
+ else:
+ self.__process_twitter_stream()
+
+ self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, True)
+
+ self.obj_buffer.persists(self.session)
+
+
+def set_logging(options, plogger=None, queue=None):
+
+ logging_config = {
+ "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s',
+ "level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable
+ }
+
+ if options.logfile == "stdout":
+ logging_config["stream"] = sys.stdout
+ elif options.logfile == "stderr":
+ logging_config["stream"] = sys.stderr
+ else:
+ logging_config["filename"] = options.logfile
+
+ logger = plogger
+ if logger is None:
+ logger = get_logger() #@UndefinedVariable
+
+ if len(logger.handlers) == 0:
+ filename = logging_config.get("filename")
+ if queue is not None:
+ hdlr = QueueHandler(queue, True)
+ elif filename:
+ mode = logging_config.get("filemode", 'a')
+ hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable
+ else:
+ stream = logging_config.get("stream")
+ hdlr = logging.StreamHandler(stream) #@UndefinedVariable
+
+ fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable
+ dfs = logging_config.get("datefmt", None)
+ fmt = logging.Formatter(fs, dfs) #@UndefinedVariable
+ hdlr.setFormatter(fmt)
+ logger.addHandler(hdlr)
+ level = logging_config.get("level")
+ if level is not None:
+ logger.setLevel(level)
+
+ options.debug = (options.verbose-options.quiet > 0)
+ return logger
+
+def set_logging_options(parser):
+ parser.add_option("-l", "--log", dest="logfile",
+ help="log to file", metavar="LOG", default="stderr")
+ parser.add_option("-v", dest="verbose", action="count",
+ help="verbose", metavar="VERBOSE", default=0)
+ parser.add_option("-q", dest="quiet", action="count",
+ help="quiet", metavar="QUIET", default=0)
+
+def get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
+
+ query = query.join(EntityHashtag).join(Hashtag)
+
+ if tweet_exclude_table is not None:
+ query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable
+
+ if start_date:
+ query = query.filter(Tweet.created_at >= start_date)
+ if end_date:
+ query = query.filter(Tweet.created_at <= end_date)
+
+ if user_whitelist:
+ query = query.join(User).filter(User.screen_name.in_(user_whitelist))
+
+
+ if hashtags :
+ def merge_hash(l,h):
+ l.extend(h.split(","))
+ return l
+ htags = reduce(merge_hash, hashtags, [])
+
+ query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable
+
+ return query
+
+
+
+def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
+
+ query = session.query(Tweet)
+ query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist)
+ return query.order_by(Tweet.created_at)
+
+
+def get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table):
+
+ query = session.query(User).join(Tweet)
+
+ query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, None)
+
+ return query.distinct()
+
+logger_name = "iri.tweet"
+
+def get_logger():
+ global logger_name
+ return logging.getLogger(logger_name) #@UndefinedVariable
+
+
+# Next two import lines for this demo only
+
+class QueueHandler(logging.Handler): #@UndefinedVariable
+ """
+ This is a logging handler which sends events to a multiprocessing queue.
+ """
+
+ def __init__(self, queue, ignore_full):
+ """
+ Initialise an instance, using the passed queue.
+ """
+ logging.Handler.__init__(self) #@UndefinedVariable
+ self.queue = queue
+ self.ignore_full = True
+
+ def emit(self, record):
+ """
+ Emit a record.
+
+ Writes the LogRecord to the queue.
+ """
+ try:
+ ei = record.exc_info
+ if ei:
+ dummy = self.format(record) # just to get traceback text into record.exc_text
+ record.exc_info = None # not needed any more
+ if not self.ignore_full or not self.queue.full():
+ self.queue.put_nowait(record)
+ except Queue.Full:
+ if self.ignore_full:
+ pass
+ else:
+ raise
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except:
+ self.handleError(record)
+
+def show_progress(current_line, total_line, label, width, writer=None):
+
+ if writer is None:
+ writer = sys.stdout
+ if sys.stdout.encoding is not None:
+ writer = codecs.getwriter(sys.stdout.encoding)(sys.stdout)
+
+ percent = (float(current_line) / float(total_line)) * 100.0
+
+ marks = math.floor(width * (percent / 100.0))
+ spaces = math.floor(width - marks)
+
+ loader = u'[' + (u'=' * int(marks)) + (u' ' * int(spaces)) + u']'
+
+ s = u"%s %3d%% %*d/%d - %*s\r" % (loader, percent, len(str(total_line)), current_line, total_line, width, label[:width])
+
+ writer.write(s) #takes the header into account
+ if percent >= 100:
+ writer.write("\n")
+ writer.flush()
+
+ return writer
--- a/script/lib/iri_tweet/models.py Mon Feb 20 18:52:19 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,290 +0,0 @@
-from sqlalchemy import (Boolean, Column, Enum, BigInteger, Integer, String,
- ForeignKey, DateTime, create_engine)
-from sqlalchemy.ext.declarative import declarative_base
-from sqlalchemy.orm import relationship, sessionmaker
-import anyjson
-import datetime
-import email.utils
-import iri_tweet
-
-
-Base = declarative_base()
-
-APPLICATION_NAME = "IRI_TWITTER"
-CONSUMER_KEY = "54ThDZhpEjokcMgHJOMnQA"
-CONSUMER_SECRET = "wUoL9UL2T87tfc97R0Dff2EaqRzpJ5XGdmaN2XK3udA"
-ACCESS_TOKEN_KEY = None
-ACCESS_TOKEN_SECRET = None
-#ACCESS_TOKEN_KEY= "47312923-LiNTtz0I18YXMVIrFeTuhmH7bOvYsK6p3Ln2Dc"
-#ACCESS_TOKEN_SECRET = "r3LoXVcjImNAElUpWqTu2SG2xCdWFHkva7xeQoncA"
-
-def adapt_date(date_str):
- ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable
- return datetime.datetime(*ts[0:7])
-
-def adapt_json(obj):
- if obj is None:
- return None
- else:
- return anyjson.serialize(obj)
-
-class TweetMeta(type(Base)):
-
- def __init__(cls, name, bases, ns): #@NoSelf
- def init(self, **kwargs):
- for key, value in kwargs.items():
- if hasattr(self, key):
- setattr(self, key, value)
- super(cls, self).__init__()
- setattr(cls, '__init__', init)
- super(TweetMeta, cls).__init__(name, bases, ns)
-
-
-class ProcessEvent(Base):
- __metaclass__ = TweetMeta
- __tablename__ = "tweet_process_event"
- id = Column(Integer, primary_key=True, autoincrement=True)
- ts = Column(DateTime, default=datetime.datetime.utcnow, index=True)
- type = Column(Enum("start","pid","shutdown","error", "start_worker", "stop_worker", "model_version", "application_name", "application_version", name="process_event_type_enum"), nullable=False)
- args = Column(String)
-
-class EntityType(Base):
- __metaclass__ = TweetMeta
- __tablename__ = "tweet_entity_type"
- id = Column(Integer, primary_key=True, autoincrement=True)
- label = Column(String)
-
-class Entity(Base):
- __metaclass__ = TweetMeta
- __tablename__ = "tweet_entity"
- id = Column(Integer, primary_key=True)
- tweet_id = Column(BigInteger, ForeignKey('tweet_tweet.id'))
- type = Column(String)
- entity_type_id = Column(Integer, ForeignKey('tweet_entity_type.id'), nullable=False)
- entity_type = relationship("EntityType", backref="entities")
- indice_start = Column(Integer)
- indice_end = Column(Integer)
- source = Column(String)
- __mapper_args__ = {'polymorphic_on': type, 'polymorphic_identity': 'entity_entity', 'with_polymorphic':'*'}
-
-
-class TweetSource(Base):
- __metaclass__ = TweetMeta
- __tablename__ = 'tweet_tweet_source'
- id = Column(Integer, primary_key=True, autoincrement=True)
- original_json = Column(String)
- received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True)
-
-
-class TweetLog(Base):
-
- TWEET_STATUS = {
- 'OK' : 1,
- 'ERROR' : 2,
- 'NOT_TWEET': 3,
- }
- __metaclass__ = TweetMeta
-
- __tablename__ = 'tweet_tweet_log'
- id = Column(Integer, primary_key=True, autoincrement=True)
- ts = Column(DateTime, default=datetime.datetime.utcnow, index=True)
- tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
- tweet_source = relationship("TweetSource", backref="logs")
- status = Column(Integer)
- error = Column(String)
- error_stack = Column(String)
-
-
-class Tweet(Base):
- __metaclass__ = TweetMeta
- __tablename__ = 'tweet_tweet'
-
- id = Column(BigInteger, primary_key=True, autoincrement=False)
- id_str = Column(String)
- contributors = Column(String)
- coordinates = Column(String)
- created_at = Column(DateTime, index=True)
- favorited = Column(Boolean)
- geo = Column(String)
- in_reply_to_screen_name = Column(String)
- in_reply_to_status_id = Column(BigInteger)
- in_reply_to_status_id_str = Column(String)
- in_reply_to_user_id = Column(BigInteger)
- in_reply_to_user_id_str = Column(String)
- place = Column(String)
- retweet_count = Column(String)
- retweeted = Column(Boolean)
- source = Column(String)
- text = Column(String)
- truncated = Column(Boolean)
- user_id = Column(Integer, ForeignKey('tweet_user.id'))
- user = relationship("User", backref="tweets")
- tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
- tweet_source = relationship("TweetSource", backref="tweet")
- entity_list = relationship(Entity, backref='tweet')
- received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True)
-
-
-class UserMessage(Base):
- __metaclass__ = TweetMeta
- __tablename__ = "tweet_user_message"
-
- id = Column(Integer, primary_key=True)
- user_id = Column(Integer, ForeignKey('tweet_user.id'))
- user = relationship("User", backref="messages")
- created_at = Column(DateTime, default=datetime.datetime.utcnow)
- message_id = Column(Integer, ForeignKey('tweet_message.id'))
-
-class Message(Base):
- __metaclass__ = TweetMeta
- __tablename__ = "tweet_message"
-
- id = Column(Integer, primary_key=True)
- created_at = Column(DateTime, default=datetime.datetime.utcnow)
- text = Column(String)
- users = relationship(UserMessage, backref='message')
-
-
-class User(Base):
- __metaclass__ = TweetMeta
- __tablename__ = "tweet_user"
-
- id = Column(BigInteger, primary_key=True, autoincrement=False)
- id_str = Column(String)
- contributors_enabled = Column(Boolean)
- created_at = Column(DateTime, index=True)
- description = Column(String)
- favourites_count = Column(Integer)
- follow_request_sent = Column(Boolean)
- followers_count = Column(Integer)
- following = Column(String)
- friends_count = Column(Integer)
- geo_enabled = Column(Boolean)
- is_translator = Column(Boolean)
- lang = Column(String)
- listed_count = Column(Integer)
- location = Column(String)
- name = Column(String)
- notifications = Column(String)
- profile_background_color = Column(String)
- profile_background_image_url = Column(String)
- profile_background_tile = Column(Boolean)
- profile_image_url = Column(String)
- profile_image_url_https = Column(String)
- profile_link_color = Column(String)
- profile_sidebar_border_color = Column(String)
- profile_sidebar_fill_color = Column(String)
- profile_text_color = Column(String)
- default_profile_image = Column(String)
- profile_use_background_image = Column(Boolean)
- protected = Column(Boolean)
- screen_name = Column(String, index=True)
- show_all_inline_media = Column(Boolean)
- statuses_count = Column(Integer)
- time_zone = Column(String)
- url = Column(String)
- utc_offset = Column(Integer)
- verified = Column(Boolean)
-
-
-class Hashtag(Base):
- __metaclass__ = TweetMeta
- __tablename__ = "tweet_hashtag"
- id = Column(Integer, primary_key=True)
- text = Column(String, unique=True, index=True)
-
-
-class Url(Base):
- __metaclass__ = TweetMeta
- __tablename__ = "tweet_url"
- id = Column(Integer, primary_key=True)
- url = Column(String, unique=True)
- expanded_url = Column(String)
-
-
-class MediaType(Base):
- __metaclass__ = TweetMeta
- __tablename__ = "tweet_media_type"
- id = Column(Integer, primary_key=True, autoincrement=True)
- label = Column(String, unique=True, index=True)
-
-
-
-class Media(Base):
- __metaclass__ = TweetMeta
- __tablename__ = "tweet_media"
- id = Column(BigInteger, primary_key=True, autoincrement=False)
- id_str = Column(String, unique=True)
- media_url = Column(String, unique=True)
- media_url_https = Column(String, unique=True)
- url = Column(String)
- display_url = Column(String)
- expanded_url = Column(String)
- sizes = Column(String)
- type_id = Column(Integer, ForeignKey("tweet_media_type.id"))
- type = relationship(MediaType, primaryjoin=type_id == MediaType.id)
-
-
-
-class EntityHashtag(Entity):
- __tablename__ = "tweet_entity_hashtag"
- __mapper_args__ = {'polymorphic_identity': 'entity_hashtag'}
- id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
- hashtag_id = Column(Integer, ForeignKey("tweet_hashtag.id"))
- hashtag = relationship(Hashtag, primaryjoin=hashtag_id == Hashtag.id)
-
-
-class EntityUrl(Entity):
- __tablename__ = "tweet_entity_url"
- __mapper_args__ = {'polymorphic_identity': 'entity_url'}
- id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
- url_id = Column(Integer, ForeignKey("tweet_url.id"))
- url = relationship(Url, primaryjoin=url_id == Url.id)
-
-class EntityUser(Entity):
- __tablename__ = "tweet_entity_user"
- __mapper_args__ = {'polymorphic_identity': 'entity_user'}
- id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
- user_id = Column(BigInteger, ForeignKey('tweet_user.id'))
- user = relationship(User, primaryjoin=(user_id == User.id))
-
-
-class EntityMedia(Entity):
- __tablename__ = "tweet_entity_media"
- __mapper_args__ = {'polymorphic_identity': 'entity_media'}
- id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
- media_id = Column(BigInteger, ForeignKey('tweet_media.id'))
- media = relationship(Media, primaryjoin=(media_id == Media.id))
-
-def add_model_version(session, must_commit=True):
- pe = ProcessEvent(args=iri_tweet.get_version(), type="model_version")
- session.add(pe)
- if must_commit:
- session.commit()
-
-def setup_database(*args, **kwargs):
-
- session_argname = [ 'autoflush','binds', "class_", "_enable_transaction_accounting","expire_on_commit", "extension", "query_cls", "twophase", "weak_identity_map", "autocommit"]
-
- kwargs_ce = dict((k, v) for k,v in kwargs.items() if (k not in session_argname and k != "create_all"))
-
- engine = create_engine(*args, **kwargs_ce)
- metadata = Base.metadata
-
- kwargs_sm = {'bind': engine}
-
- kwargs_sm.update([(argname, kwargs[argname]) for argname in session_argname if argname in kwargs])
-
- Session = sessionmaker(**kwargs_sm)
- #set model version
-
- if kwargs.get('create_all', True):
- metadata.create_all(engine)
- session = Session()
- try:
- add_model_version(session)
- finally:
- session.close()
-
- return (engine, metadata, Session)
-
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/iri_tweet/setup.py Tue Feb 21 10:47:06 2012 +0100
@@ -0,0 +1,80 @@
+#@PydevCodeAnalysisIgnore
+import sys
+import os
+
+extra = {}
+if sys.version_info >= (3, 0):
+ extra.update(use_2to3=True)
+
+
+try:
+ from setuptools import setup, find_packages
+except ImportError:
+ from distutils.core import setup, find_packages
+
+
+# -*- Distribution Meta -*-
+import re
+re_meta = re.compile(r'__(\w+?)__\s*=\s*(.*)')
+re_vers = re.compile(r'VERSION\s*=\s*\((.*?)\)')
+re_doc = re.compile(r'^"""(.+?)"""', re.M|re.S)
+rq = lambda s: s.strip("\"'")
+
+
+def add_default(m):
+ attr_name, attr_value = m.groups()
+ return ((attr_name, rq(attr_value)), )
+
+
+def add_version(m):
+ v = list(map(rq, m.groups()[0].split(", ")))
+ return (("VERSION", ".".join(v[0:3]) + "".join(v[3:])), )
+
+
+def add_doc(m):
+ return (("doc", m.groups()[0].replace("\n", " ")), )
+
+pats = {re_meta: add_default,
+ re_vers: add_version}
+here = os.path.abspath(os.path.dirname(__file__))
+meta_fh = open(os.path.join(here, "iri_tweet/__init__.py"))
+try:
+ meta = {}
+ acc = []
+ for line in meta_fh:
+ if line.strip() == '# -eof meta-':
+ break
+ acc.append(line)
+ for pattern, handler in pats.items():
+ m = pattern.match(line.strip())
+ if m:
+ meta.update(handler(m))
+ m = re_doc.match("".join(acc).strip())
+ if m:
+ meta.update(add_doc(m))
+finally:
+ meta_fh.close()
+
+
+setup(name='iri_tweet',
+ version=meta["VERSION"],
+ description=meta["doc"],
+ long_description=open("README").read(),
+ classifiers=[
+ 'License :: OSI Approved :: BSD License',
+ 'Intended Audience :: Developers',
+ 'Programming Language :: Python :: 2.6',
+ 'Programming Language :: Python :: 2.7',
+ ],
+ keywords='twitter',
+ author=meta["author"],
+ author_email=meta["contact"],
+ url=meta["homepage"],
+ license='BSD, CECCIL-C',
+ packages=find_packages(exclude=['ez_setup', 'examples', 'tests']),
+ include_package_data=True,
+ zip_safe=False,
+ platforms=["any"],
+ install_requires=[],
+ **extra
+)
--- a/script/lib/iri_tweet/tests.py Mon Feb 20 18:52:19 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,197 +0,0 @@
-from sqlalchemy import Column, Integer, String, ForeignKey, create_engine
-from sqlalchemy.ext.declarative import declarative_base
-from sqlalchemy.orm import relationship, backref
-import unittest #@UnresolvedImport
-from sqlalchemy.orm import sessionmaker
-from iri_tweet.utils import ObjectsBuffer, TwitterProcessor
-from iri_tweet import models
-import tempfile #@UnresolvedImport
-import os
-
-Base = declarative_base()
-
-class User(Base):
- __tablename__ = 'users'
-
- id = Column(Integer, primary_key=True)
- name = Column(String)
- fullname = Column(String)
- password = Column(String)
-
- def __init__(self, name, fullname, password):
- self.name = name
- self.fullname = fullname
- self.password = password
-
- def __repr__(self):
- return "<User('%s','%s', '%s')>" % (self.name, self.fullname, self.password)
-
-
-class Address(Base):
- __tablename__ = 'addresses'
- id = Column(Integer, primary_key=True)
- email_address = Column(String, nullable=False)
- user_id = Column(Integer, ForeignKey('users.id'))
-
- user = relationship("User", backref=backref('addresses', order_by=id))
-
- def __init__(self, user_id, email_address):
- self.email_address = email_address
- self.user_id = user_id
-
- def __repr__(self):
- return "<Address('%s')>" % self.email_address
-
-
-
-class TestObjectBuffer(unittest.TestCase):
-
- def setUp(self):
- self.engine = create_engine('sqlite:///:memory:', echo=False)
- Base.metadata.create_all(self.engine)
- sessionMaker = sessionmaker(bind=self.engine)
- self.session = sessionMaker()
-
- def tearDown(self):
- self.session.close()
- self.engine.dispose()
-
-
- def testCreateUser(self):
- ed_user = User('ed', 'Ed Jones', 'edspassword')
- self.session.add(ed_user)
- self.assertTrue(ed_user.id is None)
- self.session.commit()
- self.assertTrue(ed_user.id is not None)
-
-
- def testSimpleBuffer(self):
- obj_buffer = ObjectsBuffer()
- obj_proxy = obj_buffer.add_object(User, ['ed1', 'Ed1 Jones', 'edspassword'], None, False)
- self.assertTrue(obj_proxy.id() is None)
- obj_buffer.persists(self.session)
- self.assertTrue(obj_proxy.id() is None)
- self.session.commit()
- self.assertTrue(obj_proxy.id() is not None)
-
-
- def testSimpleBufferKwargs(self):
- obj_buffer = ObjectsBuffer()
- obj_proxy = obj_buffer.add_object(User, None, {'name':'ed1b', 'fullname':'Ed1b Jones', 'password':'edspassword'}, False)
- self.assertTrue(obj_proxy.id() is None)
- obj_buffer.persists(self.session)
- self.assertTrue(obj_proxy.id() is None)
- self.session.commit()
- self.assertTrue(obj_proxy.id() is not None)
-
-
- def testSimpleBufferFlush(self):
- obj_buffer = ObjectsBuffer()
- obj_proxy = obj_buffer.add_object(User, ['ed2', 'Ed2 Jones', 'edspassword'], None, True)
- self.assertTrue(obj_proxy.id() is None)
- obj_buffer.persists(self.session)
- self.assertTrue(obj_proxy.id() is not None)
- self.session.commit()
- self.assertTrue(obj_proxy.id() is not None)
-
- def testRelationBuffer(self):
- obj_buffer = ObjectsBuffer()
- user1_proxy = obj_buffer.add_object(User, ['ed3', 'Ed3 Jones', 'edspassword'], None, True)
- obj_buffer.add_object(Address, [user1_proxy.id,'ed3@mail.com'], None, False)
- obj_buffer.add_object(Address, [user1_proxy.id,'ed3@other.com'], None, False)
- user2_proxy = obj_buffer.add_object(User, ['ed4', 'Ed3 Jones', 'edspassword'], None, True)
- obj_buffer.add_object(Address, [user2_proxy.id,'ed4@mail.com'], None, False)
- obj_buffer.persists(self.session)
- self.session.commit()
- ed_user = self.session.query(User).filter_by(name='ed3').first()
- self.assertEquals(2, len(ed_user.addresses))
- ed_user = self.session.query(User).filter_by(name='ed4').first()
- self.assertEquals(1, len(ed_user.addresses))
-
-
- def testGet(self):
- obj_buffer = ObjectsBuffer()
- user1_proxy = obj_buffer.add_object(User, None, {'name':'ed2', 'fullname':'Ed2 Jones', 'password':'edspassword'}, True)
- adress_proxy = obj_buffer.add_object(Address, None, {'user_id':user1_proxy.id,'email_address':'ed2@other.com'}, False)
- user2_proxy = obj_buffer.add_object(User, None, {'name':'ed3', 'fullname':'Ed3 Jones', 'password':'edspassword'}, True)
- obj_buffer.add_object(Address, None, {'user_id':user2_proxy.id,'email_address':'ed3@other.com'}, False)
- self.assertEquals(user1_proxy, obj_buffer.get(User, name='ed2'))
- self.assertEquals(adress_proxy, obj_buffer.get(Address, email_address='ed2@other.com'))
- self.assertEquals(user2_proxy, obj_buffer.get(User, name='ed3'))
- self.assertTrue(obj_buffer.get(User, name='ed3', fullname='Ed2 Jones') is None)
-
-original_json = u'{"in_reply_to_user_id_str":null,"text":"RT @BieberEagle: \\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweeted_status":{"in_reply_to_user_id_str":null,"text":"\\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweet_count":"100+","source":"web","entities":{"user_mentions":[],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96638597737889792","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/298443445\/355584171.jpg","listed_count":5040,"friends_count":8477,"profile_link_color":"ff0000","profile_sidebar_border_color":"000000","url":"http:\/\/twitpic.com\/photos\/BieberEagle","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","profile_image_url":"http:\/\/a2.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","description":"1 name, 1 inspiration, 1 hero, 1 smile, 1 singer, 1 boy who changed my life. B.\u0130.E.B.E.R-Believe In Everything Because Everything\'s Reachable. #NEVERSAYNEVER","default_profile":false,"notifications":null,"time_zone":"Paris","followers_count":14506,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"BieberEagle","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ffffff","location":"\u2665 Albania \u2665 ","id_str":"229067923","profile_background_image_url":"http:\/\/a2.twimg.com\/profile_background_images\/298443445\/355584171.jpg","favourites_count":89,"protected":false,"follow_request_sent":null,"following":null,"name":"truebelieber","statuses_count":24279,"verified":false,"created_at":"Tue Dec 21 12:35:18 +0000 2010","profile_text_color":"000000","id":229067923,"contributors_enabled":false,"utc_offset":3600,"profile_sidebar_fill_color":""},"id":96638597737889792,"created_at":"Thu Jul 28 17:50:11 +0000 2011","geo":null,"in_reply_to_screen_name":null},"retweet_count":"100+","source":"web","entities":{"user_mentions":[{"indices":[3,15],"screen_name":"BieberEagle","id_str":"229067923","name":"truebelieber","id":229067923}],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96965037637382145","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","listed_count":3,"friends_count":1150,"profile_link_color":"00cccc","profile_sidebar_border_color":"c8ff00","url":"http:\/\/www.facebook.com\/blovedbecca180","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","profile_image_url":"http:\/\/a3.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","description":"if ya wanna know something about me, then get to know me. \\n\\r\\n\\ri promise, you wont regret it. (:\\r\\ni love justin bieber with an extreme burning passion!","default_profile":false,"notifications":null,"time_zone":"Central Time (US & Canada)","followers_count":361,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"beccaxannxx","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ff0066","location":"","id_str":"65624607","profile_background_image_url":"http:\/\/a3.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","favourites_count":266,"protected":false,"follow_request_sent":null,"following":null,"name":"beccaxannxx","statuses_count":2512,"verified":false,"created_at":"Fri Aug 14 12:36:35 +0000 2009","profile_text_color":"6a39d4","id":65624607,"contributors_enabled":false,"utc_offset":-21600,"profile_sidebar_fill_color":"ff00bb"},"id":96965037637382145,"created_at":"Fri Jul 29 15:27:21 +0000 2011","geo":null,"in_reply_to_screen_name":null}'
-original_json_media = u'{"user": {"follow_request_sent": null, "profile_use_background_image": true, "id": 34311537, "verified": false, "profile_image_url_https": "https://si0.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "profile_sidebar_fill_color": "DAECF4", "is_translator": false, "geo_enabled": false, "profile_text_color": "663B12", "followers_count": 29, "profile_sidebar_border_color": "C6E2EE", "id_str": "34311537", "default_profile_image": false, "location": "", "utc_offset": -25200, "statuses_count": 813, "description": "i like joe jonas, justin bieber, ashley tisdale, selena gomez, megen fox, kim kardashian and demi lovoto and many more.", "friends_count": 101, "profile_link_color": "1F98C7", "profile_image_url": "http://a1.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "notifications": null, "show_all_inline_media": false, "profile_background_image_url_https": "https://si0.twimg.com/profile_background_images/298244445/tour2011.jpg", "profile_background_color": "C6E2EE", "profile_background_image_url": "http://a1.twimg.com/profile_background_images/298244445/tour2011.jpg", "name": "mikayla", "lang": "en", "profile_background_tile": true, "favourites_count": 231, "screen_name": "bieberfever17ya", "url": null, "created_at": "Wed Apr 22 16:04:28 +0000 2009", "contributors_enabled": false, "time_zone": "Mountain Time (US & Canada)", "protected": false, "default_profile": false, "following": null, "listed_count": 1}, "favorited": false, "entities": {"user_mentions": [], "media": [{"media_url_https": "https://p.twimg.com/AWea5Z-CQAAvfvK.jpg", "expanded_url": "http://twitter.com/bieberfever17ya/status/101219827649232896/photo/1", "sizes": {"small": {"h": 240, "w": 201, "resize": "fit"}, "large": {"h": 240, "w": 201, "resize": "fit"}, "medium": {"h": 240, "w": 201, "resize": "fit"}, "thumb": {"h": 150, "w": 150, "resize": "crop"}}, "url": "http://t.co/N7yZ8hS", "display_url": "pic.twitter.com/N7yZ8hS", "id_str": "101219827653427200", "indices": [31, 50], "type": "photo", "id": 101219827653427200, "media_url": "http://p.twimg.com/AWea5Z-CQAAvfvK.jpg"}], "hashtags": [], "urls": []}, "contributors": null, "truncated": false, "text": "i love you justin bieber <3 http://t.co/N7yZ8hS", "created_at": "Wed Aug 10 09:14:22 +0000 2011", "retweeted": false, "in_reply_to_status_id": null, "coordinates": null, "id": 101219827649232896, "source": "web", "in_reply_to_status_id_str": null, "place": null, "in_reply_to_user_id": null, "in_reply_to_screen_name": null, "retweet_count": 0, "geo": null, "in_reply_to_user_id_str": null, "possibly_sensitive": false, "id_str": "101219827649232896"}'
-original_json_media_others = u'{"user": {"utc_offset": -25200, "statuses_count": 813, "default_profile_image": false, "friends_count": 101, "profile_background_image_url_https": "https://si0.twimg.com/profile_background_images/298244445/tour2011.jpg", "profile_use_background_image": true, "profile_sidebar_fill_color": "DAECF4", "profile_link_color": "1F98C7", "profile_image_url": "http://a1.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "time_zone": "Mountain Time (US & Canada)", "is_translator": false, "screen_name": "bieberfever17ya", "url": null, "show_all_inline_media": false, "geo_enabled": false, "profile_background_color": "C6E2EE", "id": 34311537, "profile_background_image_url": "http://a1.twimg.com/profile_background_images/298244445/tour2011.jpg", "description": "i like joe jonas, justin bieber, ashley tisdale, selena gomez, megen fox, kim kardashian and demi lovoto and many more.", "lang": "en", "profile_background_tile": true, "favourites_count": 231, "name": "mikayla", "notifications": null, "follow_request_sent": null, "created_at": "Wed Apr 22 16:04:28 +0000 2009", "verified": false, "contributors_enabled": false, "location": "", "profile_text_color": "663B12", "followers_count": 29, "profile_sidebar_border_color": "C6E2EE", "id_str": "34311537", "default_profile": false, "following": null, "protected": false, "profile_image_url_https": "https://si0.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "listed_count": 1}, "favorited": false, "contributors": null, "source": "web", "text": "i love you justin bieber <3 http://t.co/N7yZ8hS", "created_at": "Wed Aug 10 09:14:22 +0000 2011", "truncated": false, "retweeted": false, "in_reply_to_status_id_str": null, "coordinates": null, "in_reply_to_user_id_str": null, "entities": {"user_mentions": [], "media": [], "hashtags": [], "urls": [], "others": [{"url": "http://t.co/N7yZ8hS", "text": "comments", "indices": [31, 50]}]}, "in_reply_to_status_id": null, "in_reply_to_screen_name": null, "id_str": "101219827649232896", "place": null, "retweet_count": 0, "geo": null, "id": 101219827649232896, "possibly_sensitive": false, "in_reply_to_user_id": null}'
-
-class TestTwitterProcessor(unittest.TestCase):
-
- def setUp(self):
- self.engine, self.metadata, sessionMaker = models.setup_database('sqlite:///:memory:', echo=True)
- self.session = sessionMaker()
- file, self.tmpfilepath = tempfile.mkstemp()
- os.close(file)
-
-
- def testTwitterProcessor(self):
- tp = TwitterProcessor(None, original_json, None, self.session, self.tmpfilepath)
- tp.process()
- self.session.commit()
-
- self.assertEquals(1, self.session.query(models.TweetSource).count())
- self.assertEquals(1, self.session.query(models.Tweet).count())
- self.assertEquals(2, self.session.query(models.User).count())
- tweet = self.session.query(models.Tweet).first()
- self.assertFalse(tweet.user is None)
- self.assertEqual(u"beccaxannxx",tweet.user.name)
- self.assertEqual(65624607,tweet.user.id)
- self.assertEqual(1,len(tweet.entity_list))
- entity = tweet.entity_list[0]
- self.assertEqual(u"BieberEagle", entity.user.screen_name)
- self.assertTrue(entity.user.created_at is None)
- self.assertEqual("entity_user", entity.type)
- self.assertEqual("user_mentions", entity.entity_type.label)
-
-
- def testTwitterProcessorMedia(self):
- tp = TwitterProcessor(None, original_json_media, None, self.session, self.tmpfilepath)
- tp.process()
- self.session.commit()
-
- self.assertEquals(1, self.session.query(models.TweetSource).count())
- self.assertEquals(1, self.session.query(models.Tweet).count())
- self.assertEquals(1, self.session.query(models.User).count())
- tweet = self.session.query(models.Tweet).first()
- self.assertFalse(tweet.user is None)
- self.assertEqual(u"mikayla",tweet.user.name)
- self.assertEqual(34311537,tweet.user.id)
- self.assertEqual(1,len(tweet.entity_list))
- entity = tweet.entity_list[0]
- self.assertEqual(101219827653427200, entity.media.id)
- self.assertEqual("photo", entity.media.type.label)
- self.assertEqual("entity_media", entity.type)
- self.assertEqual("media", entity.entity_type.label)
-
-
- def testTwitterProcessorMediaOthers(self):
- tp = TwitterProcessor(None, original_json_media_others, None, self.session, self.tmpfilepath)
- tp.process()
- self.session.commit()
-
- self.assertEquals(1, self.session.query(models.TweetSource).count())
- self.assertEquals(1, self.session.query(models.Tweet).count())
- tweet = self.session.query(models.Tweet).first()
- self.assertEqual(1,len(tweet.entity_list))
- entity = tweet.entity_list[0]
- self.assertEqual("entity_entity", entity.type)
- self.assertEqual("others", entity.entity_type.label)
-
-
-
- def tearDown(self):
- self.session.close()
- self.engine.dispose()
- os.remove(self.tmpfilepath)
-
-if __name__ == '__main__':
- unittest.main()
\ No newline at end of file
--- a/script/lib/iri_tweet/tweet_twitter_user.py Mon Feb 20 18:52:19 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,126 +0,0 @@
-from iri_tweet.models import setup_database, Message, UserMessage, User
-from iri_tweet.utils import (get_oauth_token, get_user_query, set_logging_options,
- set_logging, parse_date, get_logger)
-from optparse import OptionParser #@UnresolvedImport
-from sqlalchemy import BigInteger
-from sqlalchemy.schema import Table, Column
-from sqlalchemy.sql import and_
-import datetime
-import re
-import sys
-import twitter
-
-APPLICATION_NAME = "Tweet recorder user"
-CONSUMER_KEY = "Vdr5ZcsjI1G3esTPI8yDg"
-CONSUMER_SECRET = "LMhNrY99R6a7E0YbZZkRFpUZpX5EfB1qATbDk1sIVLs"
-
-
-def get_options():
- parser = OptionParser()
- parser.add_option("-d", "--database", dest="database",
- help="Input database", metavar="DATABASE")
- parser.add_option("-s", "--start-date", dest="start_date",
- help="start date", metavar="START_DATE", default=None)
- parser.add_option("-e", "--end-date", dest="end_date",
- help="end date", metavar="END_DATE")
- parser.add_option("-H", "--hashtag", dest="hashtag",
- help="Hashtag", metavar="HASHTAG", default=[], action="append")
- parser.add_option("-x", "--exclude", dest="exclude",
- help="file containing the id to exclude", metavar="EXCLUDE")
- parser.add_option("-D", "--duration", dest="duration", type="int",
- help="Duration", metavar="DURATION", default=None)
- parser.add_option("-m", "--message", dest="message",
- help="tweet", metavar="MESSAGE", default="")
- parser.add_option("-u", "--user", dest="user",
- help="user", metavar="USER")
- parser.add_option("-w", "--password", dest="password",
- help="password", metavar="PASSWORD")
- parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
- help="Token file name")
- parser.add_option("-S", dest="simulate", metavar="SIMULATE", default=False, action="store_true", help="Simulate call to twitter. Do not change the database")
- parser.add_option("-f", dest="force", metavar="FORCE", default=False, action="store_true", help="force sending message to all user even if it has already been sent")
-
-
- set_logging_options(parser)
-
- return parser.parse_args()
-
-
-if __name__ == "__main__":
-
- (options, args) = get_options()
-
- set_logging(options)
-
- get_logger().debug("OPTIONS : " + repr(options)) #@UndefinedVariable
-
- if not options.message or len(options.message) == 0:
- get_logger().warning("No message exiting")
- sys.exit()
-
- conn_str = options.database.strip()
- if not re.match("^\w+://.+", conn_str):
- conn_str = 'sqlite:///' + conn_str
-
- engine, metadata, Session = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)
-
- conn = None
- try :
- conn = engine.connect()
- session = None
- try:
- session = Session(bind=conn, autoflush=True, autocommit=True)
- tweet_exclude_table = Table("tweet_exclude", metadata, Column('id', BigInteger, primary_key=True), prefixes=['TEMPORARY'])
- metadata.create_all(bind=conn,tables=[tweet_exclude_table])
-
- start_date_str = options.start_date
- end_date_str = options.end_date
- duration = options.duration
- hashtags = options.hashtag
-
- start_date = None
- if start_date_str:
- start_date = parse_date(start_date_str)
-
- end_date = None
- if end_date_str:
- end_date = parse_date(end_date_str)
- elif start_date and duration:
- end_date = start_date + datetime.timedelta(seconds=duration)
-
- base_message = options.message.decode(sys.getfilesystemencoding())
- #get or create message
- message_obj = session.query(Message).filter(Message.text == base_message).first()
- if not message_obj :
- message_obj = Message(text=base_message)
- session.add(message_obj)
- session.flush()
-
- query = get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table)
-
- if not options.force:
- query = query.outerjoin(UserMessage, and_(User.id == UserMessage.user_id, UserMessage.message_id == message_obj.id)).filter(UserMessage.message_id == None)
-
- query_res = query.all()
-
- acess_token_key, access_token_secret = get_oauth_token(options.token_filename, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET)
- t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET))
-
- for user in query_res:
- screen_name = user.screen_name
-
- message = u"@%s: %s" % (screen_name, base_message)
- get_logger().debug("new status : " + message) #@UndefinedVariable
- if not options.simulate:
- t.statuses.update(status=message)
- user_message = UserMessage(user_id=user.id, message_id=message_obj.id)
- session.add(user_message)
- session.flush()
- finally:
- # if message created and simulate, do not
- if session:
- session.close()
- finally:
- if conn:
- conn.close()
-
--- a/script/lib/iri_tweet/utils.py Mon Feb 20 18:52:19 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,616 +0,0 @@
-from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url,
- EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY,
- ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType,
- Media, EntityMedia, Entity, EntityType)
-from sqlalchemy.sql import select, or_ #@UnresolvedImport
-import Queue #@UnresolvedImport
-import anyjson #@UnresolvedImport
-import datetime
-import email.utils
-import logging
-import os.path
-import sys
-import math
-import twitter.oauth #@UnresolvedImport
-import twitter.oauth_dance #@UnresolvedImport
-import twitter_text #@UnresolvedImport
-
-
-CACHE_ACCESS_TOKEN = {}
-
-def get_oauth_token(token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET):
-
- global CACHE_ACCESS_TOKEN
-
- if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
- return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
-
- res = CACHE_ACCESS_TOKEN.get(application_name, None)
-
- if res is None and token_file_path and os.path.exists(token_file_path):
- get_logger().debug("get_oauth_token : reading token from file %s" % token_file_path) #@UndefinedVariable
- res = twitter.oauth.read_token_file(token_file_path)
-
- if res is not None and check_access_token:
- get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable
- t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], CONSUMER_KEY, CONSUMER_SECRET))
- status = None
- try:
- status = t.account.rate_limit_status()
- except Exception as e:
- get_logger().debug("get_oauth_token : error getting rate limit status %s" % repr(e))
- status = None
- get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable
- if status is None or status['remaining_hits'] == 0:
- get_logger().debug("get_oauth_token : Problem with status %s" % repr(status))
- res = None
-
- if res is None:
- get_logger().debug("get_oauth_token : doing the oauth dance")
- res = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
-
- CACHE_ACCESS_TOKEN[application_name] = res
-
- return res
-
-def parse_date(date_str):
- ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable
- return datetime.datetime(*ts[0:7])
-
-def clean_keys(dict_val):
- return dict([(str(key),value) for key,value in dict_val.items()])
-
-fields_adapter = {
- 'stream': {
- "tweet": {
- "created_at" : adapt_date,
- "coordinates" : adapt_json,
- "place" : adapt_json,
- "geo" : adapt_json,
-# "original_json" : adapt_json,
- },
- "user": {
- "created_at" : adapt_date,
- },
-
- },
-
- 'entities' : {
- "medias": {
- "sizes" : adapt_json,
- },
- },
- 'rest': {
- "tweet" : {
- "place" : adapt_json,
- "geo" : adapt_json,
- "created_at" : adapt_date,
-# "original_json" : adapt_json,
- },
- },
-}
-
-#
-# adapt fields, return a copy of the field_dict with adapted fields
-#
-def adapt_fields(fields_dict, adapter_mapping):
- def adapt_one_field(field, value):
- if field in adapter_mapping and adapter_mapping[field] is not None:
- return adapter_mapping[field](value)
- else:
- return value
- return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])
-
-
-class ObjectBufferProxy(object):
- def __init__(self, klass, args, kwargs, must_flush, instance=None):
- self.klass= klass
- self.args = args
- self.kwargs = kwargs
- self.must_flush = must_flush
- self.instance = instance
-
- def persists(self, session):
- new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
- new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {}
-
- if self.instance is None:
- self.instance = self.klass(*new_args, **new_kwargs)
- else:
- self.instance = self.klass(*new_args, **new_kwargs)
- self.instance = session.merge(self.instance)
-
- session.add(self.instance)
- if self.must_flush:
- session.flush()
-
- def __getattr__(self, name):
- return lambda : getattr(self.instance, name) if self.instance else None
-
-
-
-
-class ObjectsBuffer(object):
-
- def __init__(self):
- self.__bufferlist = []
- self.__bufferdict = {}
-
- def __add_proxy_object(self, proxy):
- proxy_list = self.__bufferdict.get(proxy.klass, None)
- if proxy_list is None:
- proxy_list = []
- self.__bufferdict[proxy.klass] = proxy_list
- proxy_list.append(proxy)
- self.__bufferlist.append(proxy)
-
- def persists(self, session):
- for object_proxy in self.__bufferlist:
- object_proxy.persists(session)
-
- def add_object(self, klass, args, kwargs, must_flush, instance=None):
- new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush, instance)
- self.__add_proxy_object(new_proxy)
- return new_proxy
-
- def get(self, klass, **kwargs):
- if klass in self.__bufferdict:
- for proxy in self.__bufferdict[klass]:
- if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass:
- continue
- found = True
- for k,v in kwargs.items():
- if (k not in proxy.kwargs) or v != proxy.kwargs[k]:
- found = False
- break
- if found:
- return proxy
- return None
-
-class TwitterProcessorException(Exception):
- pass
-
-class TwitterProcessor(object):
-
- def __init__(self, json_dict, json_txt, source_id, session, access_token=None, token_filename=None, user_query_twitter=False):
-
- if json_dict is None and json_txt is None:
- raise TwitterProcessorException("No json")
-
- if json_dict is None:
- self.json_dict = anyjson.deserialize(json_txt)
- else:
- self.json_dict = json_dict
-
- if not json_txt:
- self.json_txt = anyjson.serialize(json_dict)
- else:
- self.json_txt = json_txt
-
- if "id" not in self.json_dict:
- raise TwitterProcessorException("No id in json")
-
- self.source_id = source_id
- self.session = session
- self.token_filename = token_filename
- self.access_token = access_token
- self.obj_buffer = ObjectsBuffer()
- self.user_query_twitter = user_query_twitter
-
-
-
- def __get_user(self, user_dict, do_merge):
- get_logger().debug("Get user : " + repr(user_dict)) #@UndefinedVariable
-
- user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
-
- user_id = user_dict.get("id",None)
- user_name = user_dict.get("screen_name", user_dict.get("name", None))
-
- if user_id is None and user_name is None:
- return None
-
- user = None
- if user_id:
- user = self.obj_buffer.get(User, id=user_id)
- else:
- user = self.obj_buffer.get(User, screen_name=user_name)
-
- #to do update user id needed
- if user is not None:
- user_created_at = None
- if user.args is not None:
- user_created_at = user.args.get('created_at', None)
- if user_created_at is None and user_dict.get('created_at', None) is not None and do_merge:
- if user.args is None:
- user.args = user_dict
- else:
- user.args.update(user_dict)
- return user
-
- #todo : add methpds to objectbuffer to get buffer user
- user_obj = None
- if user_id:
- user_obj = self.session.query(User).filter(User.id == user_id).first()
- else:
- user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first()
-
- #todo update user if needed
- if user_obj is not None:
- if user_obj.created_at is not None or user_dict.get('created_at', None) is None or not do_merge :
- user = ObjectBufferProxy(User, None, None, False, user_obj)
- else:
- user = self.obj_buffer.add_object(User, None, user_dict, True, user_obj)
- return user
-
- user_created_at = user_dict.get("created_at", None)
-
- if user_created_at is None and self.user_query_twitter:
-
- if self.access_token is not None:
- acess_token_key, access_token_secret = self.access_token
- else:
- acess_token_key, access_token_secret = get_oauth_token(self.token_filename)
- t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET))
- try:
- if user_id:
- user_dict = t.users.show(user_id=user_id)
- else:
- user_dict = t.users.show(screen_name=user_name)
- except Exception as e:
- get_logger().info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
- get_logger().info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
- return None
-
- if "id" not in user_dict:
- return None
-
- #TODO filter get, wrap in proxy
- user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first()
-
- if user_obj is not None and not do_merge:
- return ObjectBufferProxy(User, None, None, False, user_obj)
- else:
- return self.obj_buffer.add_object(User, None, user_dict, True)
-
- def __get_or_create_object(self, klass, filter_by_kwargs, filter, creation_kwargs, must_flush, do_merge):
-
- obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs)
- if obj_proxy is None:
- query = self.session.query(klass)
- if filter is not None:
- query = query.filter(filter)
- else:
- query = query.filter_by(**filter_by_kwargs)
- obj_instance = query.first()
- if obj_instance is not None:
- if not do_merge:
- obj_proxy = ObjectBufferProxy(klass, None, None, False, obj_instance)
- else:
- obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush, obj_instance)
- if obj_proxy is None:
- obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush)
- return obj_proxy
-
-
- def __process_entity(self, ind, ind_type):
- get_logger().debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
-
- ind = clean_keys(ind)
-
- entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False)
-
- entity_dict = {
- "indice_start" : ind["indices"][0],
- "indice_end" : ind["indices"][1],
- "tweet_id" : self.tweet.id,
- "entity_type_id" : entity_type.id,
- "source" : adapt_json(ind)
- }
-
- def process_medias():
-
- media_id = ind.get('id', None)
- if media_id is None:
- return None, None
-
- type_str = ind.get("type", "photo")
- media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False)
- media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"])
- if "type" in media_ind:
- del(media_ind["type"])
- media_ind['type_id'] = media_type.id
- media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False)
-
- entity_dict['media_id'] = media.id
- return EntityMedia, entity_dict
-
- def process_hashtags():
- text = ind.get("text", ind.get("hashtag", None))
- if text is None:
- return None, None
- ind['text'] = text
- hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False)
- entity_dict['hashtag_id'] = hashtag.id
- return EntityHashtag, entity_dict
-
- def process_user_mentions():
- user_mention = self.__get_user(ind, False)
- if user_mention is None:
- entity_dict['user_id'] = None
- else:
- entity_dict['user_id'] = user_mention.id
- return EntityUser, entity_dict
-
- def process_urls():
- url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False)
- entity_dict['url_id'] = url.id
- return EntityUrl, entity_dict
-
- #{'': lambda }
- entity_klass, entity_dict = {
- 'hashtags': process_hashtags,
- 'user_mentions' : process_user_mentions,
- 'urls' : process_urls,
- 'media': process_medias,
- }.get(ind_type, lambda: (Entity, entity_dict))()
-
- get_logger().debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
- if entity_klass:
- self.obj_buffer.add_object(entity_klass, None, entity_dict, False)
-
-
- def __process_twitter_stream(self):
-
- tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
- if tweet_nb > 0:
- return
-
- ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
-
- # get or create user
- user = self.__get_user(self.json_dict["user"], True)
- if user is None:
- get_logger().warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
- ts_copy["user_id"] = None
- else:
- ts_copy["user_id"] = user.id
-
- del(ts_copy['user'])
- ts_copy["tweet_source_id"] = self.source_id
-
- self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True)
-
- self.__process_entities()
-
-
- def __process_entities(self):
- if "entities" in self.json_dict:
- for ind_type, entity_list in self.json_dict["entities"].items():
- for ind in entity_list:
- self.__process_entity(ind, ind_type)
- else:
-
- text = self.tweet.text
- extractor = twitter_text.Extractor(text)
- for ind in extractor.extract_hashtags_with_indices():
- self.__process_entity(ind, "hashtags")
-
- for ind in extractor.extract_urls_with_indices():
- self.__process_entity(ind, "urls")
-
- for ind in extractor.extract_mentioned_screen_names_with_indices():
- self.__process_entity(ind, "user_mentions")
-
- def __process_twitter_rest(self):
- tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
- if tweet_nb > 0:
- return
-
-
- tweet_fields = {
- 'created_at': self.json_dict["created_at"],
- 'favorited': False,
- 'id': self.json_dict["id"],
- 'id_str': self.json_dict["id_str"],
- #'in_reply_to_screen_name': ts["to_user"],
- 'in_reply_to_user_id': self.json_dict["to_user_id"],
- 'in_reply_to_user_id_str': self.json_dict["to_user_id_str"],
- #'place': ts["place"],
- 'source': self.json_dict["source"],
- 'text': self.json_dict["text"],
- 'truncated': False,
- 'tweet_source_id' : self.source_id,
- }
-
- #user
-
- user_fields = {
- 'lang' : self.json_dict.get('iso_language_code',None),
- 'profile_image_url' : self.json_dict["profile_image_url"],
- 'screen_name' : self.json_dict["from_user"],
- 'id' : self.json_dict["from_user_id"],
- 'id_str' : self.json_dict["from_user_id_str"],
- 'name' : self.json_dict['from_user_name'],
- }
-
- user = self.__get_user(user_fields, do_merge=False)
- if user is None:
- get_logger().warning("USER not found " + repr(user_fields)) #@UndefinedVariable
- tweet_fields["user_id"] = None
- else:
- tweet_fields["user_id"] = user.id
-
- tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
- self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True)
-
- self.__process_entities()
-
-
-
- def process(self):
-
- if self.source_id is None:
- tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True)
- self.source_id = tweet_source.id
-
- if "metadata" in self.json_dict:
- self.__process_twitter_rest()
- else:
- self.__process_twitter_stream()
-
- self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, True)
-
- self.obj_buffer.persists(self.session)
-
-
-def set_logging(options, plogger=None, queue=None):
-
- logging_config = {
- "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s',
- "level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable
- }
-
- if options.logfile == "stdout":
- logging_config["stream"] = sys.stdout
- elif options.logfile == "stderr":
- logging_config["stream"] = sys.stderr
- else:
- logging_config["filename"] = options.logfile
-
- logger = plogger
- if logger is None:
- logger = get_logger() #@UndefinedVariable
-
- if len(logger.handlers) == 0:
- filename = logging_config.get("filename")
- if queue is not None:
- hdlr = QueueHandler(queue, True)
- elif filename:
- mode = logging_config.get("filemode", 'a')
- hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable
- else:
- stream = logging_config.get("stream")
- hdlr = logging.StreamHandler(stream) #@UndefinedVariable
-
- fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable
- dfs = logging_config.get("datefmt", None)
- fmt = logging.Formatter(fs, dfs) #@UndefinedVariable
- hdlr.setFormatter(fmt)
- logger.addHandler(hdlr)
- level = logging_config.get("level")
- if level is not None:
- logger.setLevel(level)
-
- options.debug = (options.verbose-options.quiet > 0)
- return logger
-
-def set_logging_options(parser):
- parser.add_option("-l", "--log", dest="logfile",
- help="log to file", metavar="LOG", default="stderr")
- parser.add_option("-v", dest="verbose", action="count",
- help="verbose", metavar="VERBOSE", default=0)
- parser.add_option("-q", dest="quiet", action="count",
- help="quiet", metavar="QUIET", default=0)
-
-def get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
-
- query = query.join(EntityHashtag).join(Hashtag)
-
- if tweet_exclude_table is not None:
- query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable
-
- if start_date:
- query = query.filter(Tweet.created_at >= start_date)
- if end_date:
- query = query.filter(Tweet.created_at <= end_date)
-
- if user_whitelist:
- query = query.join(User).filter(User.screen_name.in_(user_whitelist))
-
-
- if hashtags :
- def merge_hash(l,h):
- l.extend(h.split(","))
- return l
- htags = reduce(merge_hash, hashtags, [])
-
- query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable
-
- return query
-
-
-
-def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
-
- query = session.query(Tweet)
- query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist)
- return query.order_by(Tweet.created_at)
-
-
-def get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table):
-
- query = session.query(User).join(Tweet)
-
- query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, None)
-
- return query.distinct()
-
-logger_name = "iri.tweet"
-
-def get_logger():
- global logger_name
- return logging.getLogger(logger_name) #@UndefinedVariable
-
-
-# Next two import lines for this demo only
-
-class QueueHandler(logging.Handler): #@UndefinedVariable
- """
- This is a logging handler which sends events to a multiprocessing queue.
- """
-
- def __init__(self, queue, ignore_full):
- """
- Initialise an instance, using the passed queue.
- """
- logging.Handler.__init__(self) #@UndefinedVariable
- self.queue = queue
- self.ignore_full = True
-
- def emit(self, record):
- """
- Emit a record.
-
- Writes the LogRecord to the queue.
- """
- try:
- ei = record.exc_info
- if ei:
- dummy = self.format(record) # just to get traceback text into record.exc_text
- record.exc_info = None # not needed any more
- if not self.ignore_full or not self.queue.full():
- self.queue.put_nowait(record)
- except Queue.Full:
- if self.ignore_full:
- pass
- else:
- raise
- except (KeyboardInterrupt, SystemExit):
- raise
- except:
- self.handleError(record)
-
-def show_progress(current_line, total_line, label, width):
-
- percent = (float(current_line) / float(total_line)) * 100.0
-
- marks = math.floor(width * (percent / 100.0))
- spaces = math.floor(width - marks)
-
- loader = u'[' + (u'=' * int(marks)) + (u' ' * int(spaces)) + u']'
-
- sys.stdout.write(u"%s %d%% %d/%d - %r\r" % (loader, percent, current_line - 1, total_line - 1, label[:50].rjust(50))) #takes the header into account
- if percent >= 100:
- sys.stdout.write("\n")
- sys.stdout.flush()
--- a/script/lib/tweetstream/tweetstream/__init__.py Mon Feb 20 18:52:19 2012 +0100
+++ b/script/lib/tweetstream/tweetstream/__init__.py Tue Feb 21 10:47:06 2012 +0100
@@ -1,6 +1,6 @@
"""Simple access to Twitter's streaming API"""
-VERSION = (1, 1, 1)
+VERSION = (1, 1, 1, 'iri')
__version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:])
__author__ = "Rune Halvorsen"
__contact__ = "runefh@gmail.com"
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/utils/export_tweet_db.py Tue Feb 21 10:47:06 2012 +0100
@@ -0,0 +1,47 @@
+from models import setup_database
+from optparse import OptionParser #@UnresolvedImport
+from sqlalchemy.orm import sessionmaker
+from utils import set_logging_options, set_logging, TwitterProcessor, logger
+import sqlite3 #@UnresolvedImport
+
+
+# 'entities': "tweet_entity",
+# 'user': "tweet_user"
+
+def get_option():
+
+ parser = OptionParser()
+
+ parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
+ help="Token file name")
+
+ set_logging_options(parser)
+
+ return parser.parse_args()
+
+if __name__ == "__main__":
+
+ (options, args) = get_option()
+
+ set_logging(options)
+
+ with sqlite3.connect(args[0]) as conn_in:
+ engine, metadata, Session = setup_database('sqlite:///'+args[1], echo=((options.verbose-options.quiet)>0))
+ session = Session()
+ try:
+ curs_in = conn_in.cursor()
+ fields_mapping = {}
+ for i,res in enumerate(curs_in.execute("select json from tweet_tweet;")):
+ logger.debug("main loop %d : %s" % (i, res[0])) #@UndefinedVariable
+ processor = TwitterProcessor(eval(res[0]), res[0], None, session, options.token_filename)
+ processor.process()
+ session.commit()
+ logger.debug("main : %d tweet processed" % (i+1)) #@UndefinedVariable
+ except Exception, e:
+ session.rollback()
+ raise e
+ finally:
+ session.close()
+
+
+
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/utils/tweet_twitter_user.py Tue Feb 21 10:47:06 2012 +0100
@@ -0,0 +1,126 @@
+from iri_tweet.models import setup_database, Message, UserMessage, User
+from iri_tweet.utils import (get_oauth_token, get_user_query, set_logging_options,
+ set_logging, parse_date, get_logger)
+from optparse import OptionParser #@UnresolvedImport
+from sqlalchemy import BigInteger
+from sqlalchemy.schema import Table, Column
+from sqlalchemy.sql import and_
+import datetime
+import re
+import sys
+import twitter
+
+APPLICATION_NAME = "Tweet recorder user"
+CONSUMER_KEY = "Vdr5ZcsjI1G3esTPI8yDg"
+CONSUMER_SECRET = "LMhNrY99R6a7E0YbZZkRFpUZpX5EfB1qATbDk1sIVLs"
+
+
+def get_options():
+ parser = OptionParser()
+ parser.add_option("-d", "--database", dest="database",
+ help="Input database", metavar="DATABASE")
+ parser.add_option("-s", "--start-date", dest="start_date",
+ help="start date", metavar="START_DATE", default=None)
+ parser.add_option("-e", "--end-date", dest="end_date",
+ help="end date", metavar="END_DATE")
+ parser.add_option("-H", "--hashtag", dest="hashtag",
+ help="Hashtag", metavar="HASHTAG", default=[], action="append")
+ parser.add_option("-x", "--exclude", dest="exclude",
+ help="file containing the id to exclude", metavar="EXCLUDE")
+ parser.add_option("-D", "--duration", dest="duration", type="int",
+ help="Duration", metavar="DURATION", default=None)
+ parser.add_option("-m", "--message", dest="message",
+ help="tweet", metavar="MESSAGE", default="")
+ parser.add_option("-u", "--user", dest="user",
+ help="user", metavar="USER")
+ parser.add_option("-w", "--password", dest="password",
+ help="password", metavar="PASSWORD")
+ parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
+ help="Token file name")
+ parser.add_option("-S", dest="simulate", metavar="SIMULATE", default=False, action="store_true", help="Simulate call to twitter. Do not change the database")
+ parser.add_option("-f", dest="force", metavar="FORCE", default=False, action="store_true", help="force sending message to all user even if it has already been sent")
+
+
+ set_logging_options(parser)
+
+ return parser.parse_args()
+
+
+if __name__ == "__main__":
+
+ (options, args) = get_options()
+
+ set_logging(options)
+
+ get_logger().debug("OPTIONS : " + repr(options)) #@UndefinedVariable
+
+ if not options.message or len(options.message) == 0:
+ get_logger().warning("No message exiting")
+ sys.exit()
+
+ conn_str = options.database.strip()
+ if not re.match("^\w+://.+", conn_str):
+ conn_str = 'sqlite:///' + conn_str
+
+ engine, metadata, Session = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False)
+
+ conn = None
+ try :
+ conn = engine.connect()
+ session = None
+ try:
+ session = Session(bind=conn, autoflush=True, autocommit=True)
+ tweet_exclude_table = Table("tweet_exclude", metadata, Column('id', BigInteger, primary_key=True), prefixes=['TEMPORARY'])
+ metadata.create_all(bind=conn,tables=[tweet_exclude_table])
+
+ start_date_str = options.start_date
+ end_date_str = options.end_date
+ duration = options.duration
+ hashtags = options.hashtag
+
+ start_date = None
+ if start_date_str:
+ start_date = parse_date(start_date_str)
+
+ end_date = None
+ if end_date_str:
+ end_date = parse_date(end_date_str)
+ elif start_date and duration:
+ end_date = start_date + datetime.timedelta(seconds=duration)
+
+ base_message = options.message.decode(sys.getfilesystemencoding())
+ #get or create message
+ message_obj = session.query(Message).filter(Message.text == base_message).first()
+ if not message_obj :
+ message_obj = Message(text=base_message)
+ session.add(message_obj)
+ session.flush()
+
+ query = get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table)
+
+ if not options.force:
+ query = query.outerjoin(UserMessage, and_(User.id == UserMessage.user_id, UserMessage.message_id == message_obj.id)).filter(UserMessage.message_id == None)
+
+ query_res = query.all()
+
+ acess_token_key, access_token_secret = get_oauth_token(options.token_filename, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET)
+ t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET))
+
+ for user in query_res:
+ screen_name = user.screen_name
+
+ message = u"@%s: %s" % (screen_name, base_message)
+ get_logger().debug("new status : " + message) #@UndefinedVariable
+ if not options.simulate:
+ t.statuses.update(status=message)
+ user_message = UserMessage(user_id=user.id, message_id=message_obj.id)
+ session.add(user_message)
+ session.flush()
+ finally:
+ # if message created and simulate, do not
+ if session:
+ session.close()
+ finally:
+ if conn:
+ conn.close()
+