--- a/.project Wed Jul 27 18:32:56 2011 +0200
+++ b/.project Tue Aug 09 13:07:23 2011 +0200
@@ -25,4 +25,15 @@
<nature>org.python.pydev.pythonNature</nature>
<nature>org.eclipse.wst.jsdt.core.jsNature</nature>
</natures>
+ <filteredResources>
+ <filter>
+ <id>1312812919641</id>
+ <name></name>
+ <type>6</type>
+ <matcher>
+ <id>org.eclipse.ui.ide.multiFilter</id>
+ <arguments>1.0-name-matches-false-false-.DS_Store</arguments>
+ </matcher>
+ </filter>
+ </filteredResources>
</projectDescription>
--- a/script/lib/iri_tweet/export_tweet_db.py Wed Jul 27 18:32:56 2011 +0200
+++ b/script/lib/iri_tweet/export_tweet_db.py Tue Aug 09 13:07:23 2011 +0200
@@ -1,8 +1,7 @@
from models import setup_database
from optparse import OptionParser #@UnresolvedImport
from sqlalchemy.orm import sessionmaker
-from utils import set_logging_options, set_logging, TwitterProcessor
-import logging
+from utils import set_logging_options, set_logging, TwitterProcessor, logger
import sqlite3 #@UnresolvedImport
@@ -34,11 +33,11 @@
curs_in = conn_in.cursor()
fields_mapping = {}
for i,res in enumerate(curs_in.execute("select json from tweet_tweet;")):
- logging.debug("main loop %d : %s" % (i, res[0])) #@UndefinedVariable
- processor = TwitterProcessor(eval(res[0]), res[0], session, options.token_filename)
+ logger.debug("main loop %d : %s" % (i, res[0])) #@UndefinedVariable
+ processor = TwitterProcessor(eval(res[0]), res[0], None, session, options.token_filename)
processor.process()
session.commit()
- logging.debug("main : %d tweet processed" % (i+1)) #@UndefinedVariable
+ logger.debug("main : %d tweet processed" % (i+1)) #@UndefinedVariable
except Exception, e:
session.rollback()
raise e
--- a/script/lib/iri_tweet/export_twitter_alchemy.py Wed Jul 27 18:32:56 2011 +0200
+++ b/script/lib/iri_tweet/export_twitter_alchemy.py Tue Aug 09 13:07:23 2011 +0200
@@ -5,10 +5,9 @@
from optparse import OptionParser #@UnresolvedImport
from sqlalchemy import Table, Column, BigInteger, MetaData
from sqlalchemy.orm import sessionmaker
-from utils import parse_date, set_logging_options, set_logging, get_filter_query
+from utils import parse_date, set_logging_options, set_logging, get_filter_query, logger
from models import setup_database
import datetime
-import logging
import os.path
import re
import sys
@@ -101,7 +100,7 @@
set_logging(options)
- logging.debug("OPTIONS : " + repr(options)) #@UndefinedVariable
+ logger.debug("OPTIONS : " + repr(options)) #@UndefinedVariable
if len(sys.argv) == 1 or options.database is None:
parser.print_help()
@@ -159,7 +158,7 @@
for params in parameters:
- logging.debug("PARAMETERS " + repr(params)) #@UndefinedVariable
+ logger.debug("PARAMETERS " + repr(params)) #@UndefinedVariable
start_date_str = params.get("start_date",None)
end_date_str = params.get("end_date", None)
@@ -192,12 +191,12 @@
if content_file and content_file.find("http") == 0:
- logging.debug("url : " + content_file) #@UndefinedVariable
+ logger.debug("url : " + content_file) #@UndefinedVariable
h = httplib2.Http()
resp, content = h.request(content_file)
- logging.debug("url response " + repr(resp) + " content " + repr(content)) #@UndefinedVariable
+ logger.debug("url response " + repr(resp) + " content " + repr(content)) #@UndefinedVariable
project = anyjson.deserialize(content)
root = etree.fromstring(project["ldt"])
@@ -254,7 +253,7 @@
if ensemble_parent is None:
- logging.error("Can not process file") #@UndefinedVariable
+ logger.error("Can not process file") #@UndefinedVariable
sys.exit()
if options.replace:
@@ -309,18 +308,18 @@
project["ldt"] = output_data
body = anyjson.serialize(project)
- logging.debug("write http " + content_file) #@UndefinedVariable
- logging.debug("write http " + repr(body)) #@UndefinedVariable
+ logger.debug("write http " + content_file) #@UndefinedVariable
+ logger.debug("write http " + repr(body)) #@UndefinedVariable
h = httplib2.Http()
resp, content = h.request(content_file, "PUT", headers={'content-type':'application/json'}, body=body)
- logging.debug("write http " + repr(resp) + " content " + content) #@UndefinedVariable
+ logger.debug("write http " + repr(resp) + " content " + content) #@UndefinedVariable
else:
if content_file and os.path.exists(content_file):
dest_file_name = content_file
else:
dest_file_name = options.filename
- logging.debug("WRITE : " + dest_file_name) #@UndefinedVariable
+ logger.debug("WRITE : " + dest_file_name) #@UndefinedVariable
output = open(dest_file_name, "w")
output.write(output_data)
output.flush()
--- a/script/lib/iri_tweet/models.py Wed Jul 27 18:32:56 2011 +0200
+++ b/script/lib/iri_tweet/models.py Tue Aug 09 13:07:23 2011 +0200
@@ -42,7 +42,34 @@
if hasattr(self,key):
setattr(self,key,value)
+class TweetSource(Base):
+ __tablename__ = 'tweet_tweet_source'
+ id = Column(Integer, primary_key = True, autoincrement=True)
+ original_json = Column(String)
+ received_at = Column(DateTime, default=datetime.datetime.now())
+
+ def __init__(self, **kwargs):
+ for key, value in kwargs.items():
+ if hasattr(self,key):
+ setattr(self,key,value)
+
+class TweetLog(Base):
+
+ TWEET_STATUS = {
+ 'OK' : 1,
+ 'ERROR' : 2,
+ }
+
+ __tablename__ = 'tweet_tweet_log'
+ id = Column(Integer, primary_key = True, autoincrement=True)
+ tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
+ tweet_source = relationship("TweetSource", backref="logs")
+ status = Column(Integer)
+ error = Column(String)
+ error_stack = Column(String)
+
+
class Tweet(Base):
__tablename__ = 'tweet_tweet'
@@ -65,12 +92,12 @@
text = Column(String)
truncated = Column(Boolean)
user_id = Column(Integer, ForeignKey('tweet_user.id'))
- original_json = Column(String)
+ user = relationship("User", backref="tweets")
+ tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
+ tweet_source = relationship("TweetSource", backref="tweet")
entity_list = relationship(Entity, backref='tweet')
received_at = Column(DateTime, default=datetime.datetime.now())
-
- #user = relationship(User, primaryjoin=user_id == User.id)
-
+
def __init__(self, **kwargs):
for key, value in kwargs.items():
if hasattr(self,key):
@@ -81,11 +108,11 @@
id = Column(Integer, primary_key = True)
user_id = Column(Integer, ForeignKey('tweet_user.id'))
+ user = relationship("User", backref="messages")
created_at = Column(DateTime, default=datetime.datetime.now())
message_id = Column(Integer, ForeignKey('tweet_message.id'))
class Message(Base):
-
__tablename__ = "tweet_message"
id = Column(Integer, primary_key = True)
@@ -131,8 +158,6 @@
url= Column(String)
utc_offset = Column(Integer)
verified= Column(Boolean)
- tweets = relationship(Tweet, backref='user')
- messages = relationship(UserMessage, backref='user')
def __init__(self, **kwargs):
for key, value in kwargs.items():
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/iri_tweet/tests.py Tue Aug 09 13:07:23 2011 +0200
@@ -0,0 +1,148 @@
+from sqlalchemy import Column, Integer, String, ForeignKey, create_engine
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import relationship, backref
+import unittest #@UnresolvedImport
+from sqlalchemy.orm import sessionmaker
+from iri_tweet.utils import ObjectsBuffer, TwitterProcessor
+from iri_tweet import models
+import tempfile #@UnresolvedImport
+import os
+
+Base = declarative_base()
+
+class User(Base):
+ __tablename__ = 'users'
+
+ id = Column(Integer, primary_key=True)
+ name = Column(String)
+ fullname = Column(String)
+ password = Column(String)
+
+ def __init__(self, name, fullname, password):
+ self.name = name
+ self.fullname = fullname
+ self.password = password
+
+ def __repr__(self):
+ return "<User('%s','%s', '%s')>" % (self.name, self.fullname, self.password)
+
+
+class Address(Base):
+ __tablename__ = 'addresses'
+ id = Column(Integer, primary_key=True)
+ email_address = Column(String, nullable=False)
+ user_id = Column(Integer, ForeignKey('users.id'))
+
+ user = relationship("User", backref=backref('addresses', order_by=id))
+
+ def __init__(self, user_id, email_address):
+ self.email_address = email_address
+ self.user_id = user_id
+
+ def __repr__(self):
+ return "<Address('%s')>" % self.email_address
+
+
+
+class TestObjectBuffer(unittest.TestCase):
+
+ def setUp(self):
+ self.engine = create_engine('sqlite:///:memory:', echo=False)
+ Base.metadata.create_all(self.engine)
+ sessionMaker = sessionmaker(bind=self.engine)
+ self.session = sessionMaker()
+
+ def tearDown(self):
+ self.session.close()
+ self.engine.dispose()
+
+
+ def testCreateUser(self):
+ ed_user = User('ed', 'Ed Jones', 'edspassword')
+ self.session.add(ed_user)
+ self.assertTrue(ed_user.id is None)
+ self.session.commit()
+ self.assertTrue(ed_user.id is not None)
+
+
+ def testSimpleBuffer(self):
+ obj_buffer = ObjectsBuffer()
+ obj_proxy = obj_buffer.add_object(User, ['ed1', 'Ed1 Jones', 'edspassword'], None, False)
+ self.assertTrue(obj_proxy.id() is None)
+ obj_buffer.persists(self.session)
+ self.assertTrue(obj_proxy.id() is None)
+ self.session.commit()
+ self.assertTrue(obj_proxy.id() is not None)
+
+
+ def testSimpleBufferKwargs(self):
+ obj_buffer = ObjectsBuffer()
+ obj_proxy = obj_buffer.add_object(User, None, {'name':'ed1b', 'fullname':'Ed1b Jones', 'password':'edspassword'}, False)
+ self.assertTrue(obj_proxy.id() is None)
+ obj_buffer.persists(self.session)
+ self.assertTrue(obj_proxy.id() is None)
+ self.session.commit()
+ self.assertTrue(obj_proxy.id() is not None)
+
+
+ def testSimpleBufferFlush(self):
+ obj_buffer = ObjectsBuffer()
+ obj_proxy = obj_buffer.add_object(User, ['ed2', 'Ed2 Jones', 'edspassword'], None, True)
+ self.assertTrue(obj_proxy.id() is None)
+ obj_buffer.persists(self.session)
+ self.assertTrue(obj_proxy.id() is not None)
+ self.session.commit()
+ self.assertTrue(obj_proxy.id() is not None)
+
+ def testRelationBuffer(self):
+ obj_buffer = ObjectsBuffer()
+ user1_proxy = obj_buffer.add_object(User, ['ed3', 'Ed3 Jones', 'edspassword'], None, True)
+ obj_buffer.add_object(Address, [user1_proxy.id,'ed3@mail.com'], None, False)
+ obj_buffer.add_object(Address, [user1_proxy.id,'ed3@other.com'], None, False)
+ user2_proxy = obj_buffer.add_object(User, ['ed4', 'Ed3 Jones', 'edspassword'], None, True)
+ obj_buffer.add_object(Address, [user2_proxy.id,'ed4@mail.com'], None, False)
+ obj_buffer.persists(self.session)
+ self.session.commit()
+ ed_user = self.session.query(User).filter_by(name='ed3').first()
+ self.assertEquals(2, len(ed_user.addresses))
+ ed_user = self.session.query(User).filter_by(name='ed4').first()
+ self.assertEquals(1, len(ed_user.addresses))
+
+
+
+original_json = u'{"in_reply_to_user_id_str":null,"text":"RT @BieberEagle: \\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweeted_status":{"in_reply_to_user_id_str":null,"text":"\\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweet_count":"100+","source":"web","entities":{"user_mentions":[],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96638597737889792","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/298443445\/355584171.jpg","listed_count":5040,"friends_count":8477,"profile_link_color":"ff0000","profile_sidebar_border_color":"000000","url":"http:\/\/twitpic.com\/photos\/BieberEagle","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","profile_image_url":"http:\/\/a2.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","description":"1 name, 1 inspiration, 1 hero, 1 smile, 1 singer, 1 boy who changed my life. B.\u0130.E.B.E.R-Believe In Everything Because Everything\'s Reachable. #NEVERSAYNEVER","default_profile":false,"notifications":null,"time_zone":"Paris","followers_count":14506,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"BieberEagle","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ffffff","location":"\u2665 Albania \u2665 ","id_str":"229067923","profile_background_image_url":"http:\/\/a2.twimg.com\/profile_background_images\/298443445\/355584171.jpg","favourites_count":89,"protected":false,"follow_request_sent":null,"following":null,"name":"truebelieber","statuses_count":24279,"verified":false,"created_at":"Tue Dec 21 12:35:18 +0000 2010","profile_text_color":"000000","id":229067923,"contributors_enabled":false,"utc_offset":3600,"profile_sidebar_fill_color":""},"id":96638597737889792,"created_at":"Thu Jul 28 17:50:11 +0000 2011","geo":null,"in_reply_to_screen_name":null},"retweet_count":"100+","source":"web","entities":{"user_mentions":[{"indices":[3,15],"screen_name":"BieberEagle","id_str":"229067923","name":"truebelieber","id":229067923}],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96965037637382145","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","listed_count":3,"friends_count":1150,"profile_link_color":"00cccc","profile_sidebar_border_color":"c8ff00","url":"http:\/\/www.facebook.com\/blovedbecca180","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","profile_image_url":"http:\/\/a3.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","description":"if ya wanna know something about me, then get to know me. \\n\\r\\n\\ri promise, you wont regret it. (:\\r\\ni love justin bieber with an extreme burning passion!","default_profile":false,"notifications":null,"time_zone":"Central Time (US & Canada)","followers_count":361,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"beccaxannxx","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ff0066","location":"","id_str":"65624607","profile_background_image_url":"http:\/\/a3.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","favourites_count":266,"protected":false,"follow_request_sent":null,"following":null,"name":"beccaxannxx","statuses_count":2512,"verified":false,"created_at":"Fri Aug 14 12:36:35 +0000 2009","profile_text_color":"6a39d4","id":65624607,"contributors_enabled":false,"utc_offset":-21600,"profile_sidebar_fill_color":"ff00bb"},"id":96965037637382145,"created_at":"Fri Jul 29 15:27:21 +0000 2011","geo":null,"in_reply_to_screen_name":null}'
+
+
+class TestTwitterProcessor(unittest.TestCase):
+
+ def setUp(self):
+ self.engine, self.metadata = models.setup_database('sqlite:///:memory:', echo=True)
+ sessionMaker = sessionmaker(bind=self.engine)
+ self.session = sessionMaker()
+ file, self.tmpfilepath = tempfile.mkstemp()
+ os.close(file)
+
+
+ def testTwitterProcessor(self):
+ tp = TwitterProcessor(None, original_json, None, self.session, self.tmpfilepath)
+ tp.process()
+ self.session.commit()
+
+ self.assertEquals(1, self.session.query(models.TweetSource).count())
+ self.assertEquals(1, self.session.query(models.Tweet).count())
+ self.assertEquals(2, self.session.query(models.User).count())
+ tweet = self.session.query(models.Tweet).first()
+ self.assertFalse(tweet.user is None)
+ self.assertEqual(u"beccaxannxx",tweet.user.name)
+ self.assertEqual(65624607,tweet.user.id)
+ self.assertEqual(1,len(tweet.entity_list))
+ self.assertEqual(u"BieberEagle", tweet.entity_list[0].user.screen_name)
+
+
+ def tearDown(self):
+ self.session.close()
+ self.engine.dispose()
+ os.remove(self.tmpfilepath)
+
+if __name__ == '__main__':
+ unittest.main()
\ No newline at end of file
--- a/script/lib/iri_tweet/tweet_twitter_user.py Wed Jul 27 18:32:56 2011 +0200
+++ b/script/lib/iri_tweet/tweet_twitter_user.py Tue Aug 09 13:07:23 2011 +0200
@@ -1,13 +1,12 @@
from iri_tweet.models import setup_database, Message, UserMessage, User
from iri_tweet.utils import (get_oauth_token, get_user_query, set_logging_options,
- set_logging, parse_date)
+ set_logging, parse_date, logger)
from optparse import OptionParser #@UnresolvedImport
from sqlalchemy import BigInteger
from sqlalchemy.orm import sessionmaker
from sqlalchemy.schema import MetaData, Table, Column
from sqlalchemy.sql import and_
import datetime
-import logging #@UnresolvedImport
import sys
import time
import twitter
@@ -54,7 +53,7 @@
set_logging(options)
- logging.debug("OPTIONS : " + repr(options)) #@UndefinedVariable
+ logger.debug("OPTIONS : " + repr(options)) #@UndefinedVariable
if not options.message or len(options.message) == 0:
sys.exit()
@@ -108,7 +107,7 @@
screen_name = user.screen_name
message = u"@%s: %s" % (screen_name, base_message)
- logging.debug("new status : " + message) #@UndefinedVariable
+ logger.debug("new status : " + message) #@UndefinedVariable
if not options.simulate:
t.statuses.update(status=message)
user_message = UserMessage(user_id=user.id, message_id=message_obj.id)
--- a/script/lib/iri_tweet/utils.py Wed Jul 27 18:32:56 2011 +0200
+++ b/script/lib/iri_tweet/utils.py Tue Aug 09 13:07:23 2011 +0200
@@ -1,6 +1,6 @@
-from models import Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, \
- EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, \
- ACCESS_TOKEN_SECRET, adapt_date, adapt_json
+from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url,
+ EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY,
+ ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog)
from sqlalchemy.sql import select, or_ #@UnresolvedImport
import anyjson #@UnresolvedImport
import datetime
@@ -77,13 +77,67 @@
return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])
+class ObjectBufferProxy(object):
+ def __init__(self, klass, args, kwargs, must_flush, instance=None):
+ self.klass= klass
+ self.args = args
+ self.kwargs = kwargs
+ self.must_flush = must_flush
+ self.instance = instance
+
+ def persists(self, session):
+ new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
+ new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {}
+
+ self.instance = self.klass(*new_args, **new_kwargs)
+ session.add(self.instance)
+ if self.must_flush:
+ session.flush()
+
+ def __getattr__(self, name):
+ return lambda : getattr(self.instance, name) if self.instance else None
+
+
+
+
+class ObjectsBuffer(object):
+
+ def __init__(self):
+ self.__bufferlist = []
+
+ def persists(self, session):
+ for object_proxy in self.__bufferlist:
+ object_proxy.persists(session)
+
+ def add_object(self, klass, args, kwargs, must_flush):
+ new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush)
+ self.__bufferlist.append(new_proxy)
+ return new_proxy
+
+ def get(self, klass, **kwargs):
+ for proxy in self.__bufferlist:
+ if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass:
+ continue
+ found = True
+ for k,v in kwargs.items():
+ if (k not in proxy.kwargs) or v != proxy.kwargs[k]:
+ found = False
+ break
+ if found:
+ return proxy
+
+ return None
+
+
+
+
class TwitterProcessorException(Exception):
pass
class TwitterProcessor(object):
- def __init__(self, json_dict, json_txt, session, token_filename=None):
+ def __init__(self, json_dict, json_txt, source_id, session, token_filename=None):
if json_dict is None and json_txt is None:
raise TwitterProcessorException("No json")
@@ -101,24 +155,39 @@
if "id" not in self.json_dict:
raise TwitterProcessorException("No id in json")
+ self.source_id = source_id
self.session = session
self.token_filename = token_filename
+ self.obj_buffer = ObjectsBuffer()
+
def __get_user(self, user_dict):
- logging.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
+ logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
user_id = user_dict.get("id",None)
user_name = user_dict.get("screen_name", user_dict.get("name", None))
if user_id is None and user_name is None:
return None
-
+
+ user = None
if user_id:
- user = self.session.query(User).filter(User.id == user_id).first()
+ user = self.obj_buffer.get(User, id=user_id)
else:
- user = self.session.query(User).filter(User.screen_name.ilike(user_name)).first()
+ user = self.obj_buffer.get(User, screen_name=user_name)
+
+ if user is not None:
+ return user
+
+ #todo : add methpds to objectbuffer to get buffer user
+ user_obj = None
+ if user_id:
+ user_obj = self.session.query(User).filter(User.id == user_id).first()
+ else:
+ user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first()
- if user is not None:
+ if user_obj is not None:
+ user = ObjectBufferProxy(User, None, None, False, user_obj)
return user
user_created_at = user_dict.get("created_at", None)
@@ -132,28 +201,27 @@
else:
user_dict = t.users.show(screen_name=user_name)
except Exception as e:
- logging.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
- logging.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
+ logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
+ logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
return None
user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
if "id" not in user_dict:
return None
+ #TODO filter get, wrap in proxy
user = self.session.query(User).filter(User.id == user_dict["id"]).first()
if user is not None:
return user
- user = User(**user_dict)
+ user = self.obj_buffer.add_object(User, None, user_dict, True)
- self.session.add(user)
- self.session.flush()
-
- return user
+ return user
+
def __process_entity(self, ind, ind_type):
- logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
+ logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
ind = clean_keys(ind)
@@ -161,57 +229,53 @@
"indice_start": ind["indices"][0],
"indice_end" : ind["indices"][1],
"tweet_id" : self.tweet.id,
- "tweet" : self.tweet
}
def process_hashtags():
text = ind.get("text", ind.get("hashtag", None))
if text is None:
- return None
- hashtag = self.session.query(Hashtag).filter(Hashtag.text.ilike(text)).first()
+ return None
+ hashtag = self.obj_buffer.get(Hashtag, text=text)
+ if hashtag is None:
+ hashtag_obj = self.session.query(Hashtag).filter(Hashtag.text.ilike(text)).first()
+ if hashtag_obj is not None:
+ hashtag = ObjectBufferProxy(Hashtag, None, None, False, hashtag_obj)
+
if hashtag is None:
ind["text"] = text
- hashtag = Hashtag(**ind)
- self.session.add(hashtag)
- self.session.flush()
- entity_dict['hashtag'] = hashtag
+ hashtag = self.obj_buffer.add_object(Hashtag, None, ind, True)
entity_dict['hashtag_id'] = hashtag.id
- entity = EntityHashtag(**entity_dict)
- return entity
+ return EntityHashtag, entity_dict
def process_user_mentions():
user_mention = self.__get_user(ind)
if user_mention is None:
- entity_dict['user'] = None
entity_dict['user_id'] = None
else:
- entity_dict['user'] = user_mention
entity_dict['user_id'] = user_mention.id
- entity = EntityUser(**entity_dict)
- return entity
+ return EntityUser, entity_dict
def process_urls():
- url = self.session.query(Url).filter(Url.url == ind["url"]).first()
+ url = self.obj_buffer.get(Url, url=ind["url"])
if url is None:
- url = Url(**ind)
- self.session.add(url)
- self.session.flush()
- entity_dict['url'] = url
+ url_obj = self.session.query(Url).filter(Url.url == ind["url"]).first()
+ if url_obj is not None:
+ url = ObjectBufferProxy(Url, None, None, False, url_obj)
+ if url is None:
+ url = self.obj_buffer.add_object(Url, None, ind, True)
entity_dict['url_id'] = url.id
- entity = EntityUrl(**entity_dict)
- return entity
+ return EntityUrl, entity_dict
#{'': lambda }
- entity = {
+ entity_klass, entity_dict = {
'hashtags': process_hashtags,
'user_mentions' : process_user_mentions,
'urls' : process_urls
}[ind_type]()
- logging.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
- if entity:
- self.session.add(entity)
- self.session.flush()
+ logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
+ if entity_klass:
+ self.obj_buffer.add_object(entity_klass, None, entity_dict, False)
def __process_twitter_stream(self):
@@ -225,16 +289,15 @@
# get or create user
user = self.__get_user(self.json_dict["user"])
if user is None:
- logging.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
- ts_copy["user"] = None
+ logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
ts_copy["user_id"] = None
else:
- ts_copy["user"] = user
- ts_copy["user_id"] = ts_copy["user"].id
- ts_copy["original_json"] = self.json_txt
+ ts_copy["user_id"] = user.id
+
+ del(ts_copy['user'])
+ ts_copy["tweet_source_id"] = self.source_id
- self.tweet = Tweet(**ts_copy)
- self.session.add(self.tweet)
+ self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True)
# get entities
if "entities" in self.json_dict:
@@ -260,7 +323,8 @@
tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
if tweet_nb > 0:
return
-
+
+
tweet_fields = {
'created_at': self.json_dict["created_at"],
'favorited': False,
@@ -272,8 +336,8 @@
#'place': ts["place"],
'source': self.json_dict["source"],
'text': self.json_dict["text"],
- 'truncated': False,
- 'original_json' : self.json_txt,
+ 'truncated': False,
+ 'tweet_source_id' : self.source_id,
}
#user
@@ -286,16 +350,13 @@
user = self.__get_user(user_fields)
if user is None:
- logging.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
- tweet_fields["user"] = None
+ logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
tweet_fields["user_id"] = None
else:
- tweet_fields["user"] = user
tweet_fields["user_id"] = user.id
tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
- self.tweet = Tweet(**tweet_fields)
- self.session.add(self.tweet)
+ self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True)
text = self.tweet.text
@@ -303,26 +364,37 @@
for ind in extractor.extract_hashtags_with_indices():
self.__process_entity(ind, "hashtags")
-
- for ind in extractor.extract_mentioned_screen_names_with_indices():
- self.__process_entity(ind, "user_mentions")
-
+
for ind in extractor.extract_urls_with_indices():
self.__process_entity(ind, "urls")
- self.session.flush()
+ for ind in extractor.extract_mentioned_screen_names_with_indices():
+ self.__process_entity(ind, "user_mentions")
+
def process(self):
+
+ if self.source_id is None:
+ tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True)
+ self.source_id = tweet_source.id
+
if "metadata" in self.json_dict:
self.__process_twitter_rest()
else:
self.__process_twitter_stream()
+
+ self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, False)
+ self.obj_buffer.persists(self.session)
+
-def set_logging(options):
+def set_logging(options, plogger=None):
- logging_config = {}
+ logging_config = {
+ "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s',
+ "level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable
+ }
if options.logfile == "stdout":
logging_config["stream"] = sys.stdout
@@ -330,9 +402,27 @@
logging_config["stream"] = sys.stderr
else:
logging_config["filename"] = options.logfile
-
- logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) #@UndefinedVariable
- logging.basicConfig(**logging_config) #@UndefinedVariable
+
+ logger = plogger
+ if logger is None:
+ logger = logging.getLogger() #@UndefinedVariable
+
+ if len(logger.handlers) == 0:
+ filename = logging_config.get("filename")
+ if filename:
+ mode = logging_config.get("filemode", 'a')
+ hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable
+ else:
+ stream = logging_config.get("stream")
+ hdlr = logging.StreamHandler(stream) #@UndefinedVariable
+ fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable
+ dfs = logging_config.get("datefmt", None)
+ fmt = logging.Formatter(fs, dfs) #@UndefinedVariable
+ hdlr.setFormatter(fmt)
+ logger.addHandler(hdlr)
+ level = logging_config.get("level")
+ if level is not None:
+ logger.setLevel(level)
options.debug = (options.verbose-options.quiet > 0)
@@ -387,4 +477,4 @@
return query.distinct()
-
+logger = logging.getLogger() #@UndefinedVariable
--- a/script/lib/tweetstream/setup.py Wed Jul 27 18:32:56 2011 +0200
+++ b/script/lib/tweetstream/setup.py Tue Aug 09 13:07:23 2011 +0200
@@ -1,3 +1,4 @@
+#@PydevCodeAnalysisIgnore
from setuptools import setup, find_packages
import sys, os
--- a/script/lib/tweetstream/tweetstream/streamclasses.py Wed Jul 27 18:32:56 2011 +0200
+++ b/script/lib/tweetstream/tweetstream/streamclasses.py Tue Aug 09 13:07:23 2011 +0200
@@ -54,7 +54,7 @@
:attr: `USER_AGENT`.
"""
- def __init__(self, auth, catchup=None, url=None):
+ def __init__(self, auth, catchup=None, url=None, as_text=False):
self._conn = None
self._rate_ts = None
self._rate_cnt = 0
@@ -68,6 +68,7 @@
self.rate = 0
self.user_agent = USER_AGENT
if url: self.url = url
+ self._as_text = as_text
self.muststop = False
@@ -119,12 +120,18 @@
this method and return post data. The data should be in the format
returned by urllib.urlencode."""
return None
+
+ def __muststop(self):
+ if callable(self.muststop):
+ return self.muststop()
+ else:
+ return self.muststop
def next(self):
"""Return the next available tweet. This call is blocking!"""
while True:
try:
- if self.muststop:
+ if self.__muststop():
raise StopIteration()
if not self.connected:
@@ -143,10 +150,15 @@
elif data.isspace():
continue
- data = anyjson.deserialize(data)
- if 'text' in data:
+ if not self._as_text:
+ data = anyjson.deserialize(data)
+ if 'text' in data:
+ self.count += 1
+ self._rate_cnt += 1
+ else: # count and rate may be off, but we count everything
self.count += 1
self._rate_cnt += 1
+
return data
except ValueError, e:
@@ -175,12 +187,12 @@
url = "http://stream.twitter.com/1/statuses/filter.json"
def __init__(self, auth, follow=None, locations=None,
- track=None, catchup=None, url=None):
+ track=None, catchup=None, url=None, as_text=False):
self._follow = follow
self._locations = locations
self._track = track
# remove follow, locations, track
- BaseStream.__init__(self, auth, url=url)
+ BaseStream.__init__(self, auth, url=url, as_text=as_text)
def _get_post_data(self):
postdata = {}
--- a/script/rest/search_twitter.py Wed Jul 27 18:32:56 2011 +0200
+++ b/script/rest/search_twitter.py Tue Aug 09 13:07:23 2011 +0200
@@ -49,12 +49,12 @@
page = 1
while page <= int(1500/int(options.rpp)) and ( results is None or len(results) > 0):
- results = twitter. search(q=options.query, rpp=options.rpp, page=page)
+ results = twitter.search(q=options.query, rpp=options.rpp, page=page)
for tweet in results["results"]:
print tweet
tweet_str = anyjson.serialize(tweet)
#invalidate user id
- processor = utils.TwitterProcessor(tweet, tweet_str, session, options.token_filename)
+ processor = utils.TwitterProcessor(tweet, tweet_str, None, session, options.token_filename)
processor.process()
session.flush()
session.commit()
--- a/script/stream/recorder_tweetstream.py Wed Jul 27 18:32:56 2011 +0200
+++ b/script/stream/recorder_tweetstream.py Tue Aug 09 13:07:23 2011 +0200
@@ -1,16 +1,24 @@
from getpass import getpass
from iri_tweet import models, utils
+from iri_tweet.models import TweetSource, TweetLog
+from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
from optparse import OptionParser
from sqlalchemy.orm import sessionmaker
-from sqlite3 import *
+import StringIO
+import logging
+import anyjson
import datetime
-import logging
import os
+import shutil
+import signal
import socket
import sys
import time
+import traceback
+import tweepy.auth
import tweetstream
-import tweepy.auth
+from iri_tweet.utils import logger
+from sqlalchemy.exc import OperationalError
socket._fileobject.default_bufsize = 0
@@ -44,12 +52,12 @@
"""
- def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, **kwargs):
+ def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs):
self.max_reconnects = reconnects
self.retry_wait = retry_wait
self._reconnects = 0
self._error_cb = error_cb
- super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, **kwargs)
+ super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs)
def next(self):
while True:
@@ -72,45 +80,138 @@
-def process_tweet(tweet, session, debug, token_filename):
- screen_name = ""
- if 'user' in tweet and 'screen_name' in tweet['user']:
- screen_name = tweet['user']['screen_name']
- logging.info("Process_tweet from %s : %s" % (screen_name, tweet['text']))
- logging.debug("Process_tweet :" + repr(tweet))
- processor = utils.TwitterProcessor(tweet, None, session, token_filename)
- processor.process()
-
-def main(username, password, track, session, debug, reconnects, token_filename, duration):
-
- #username = username or raw_input('Twitter username: ')
- #password = password or getpass('Twitter password: ')
-
- track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
- track_list = [k for k in track_list.split(',')]
+class SourceProcess(Process):
+
+ def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
+ self.session_maker = session_maker
+ self.queue = queue
+ self.auth = auth
+ self.track = track
+ self.debug = debug
+ self.reconnects = reconnects
+ self.token_filename = token_filename
+ self.stop_event = stop_event
+ super(SourceProcess, self).__init__()
+# self.stop_event =
- if username and password:
- auth = tweepy.auth.BasicAuthHandler(username, password)
- else:
- consumer_key = models.CONSUMER_KEY
- consumer_secret = models.CONSUMER_SECRET
- auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
- auth.set_access_token(*(utils.get_oauth_token(token_filename)))
-
- if duration >= 0:
- end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration)
+ def run(self):
+
+ get_logger().debug("SourceProcess : run")
+ track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
+ track_list = [k for k in track_list.split(',')]
+
+ get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))
+ stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
+ get_logger().debug("SourceProcess : after connecting to stream")
+ stream.muststop = lambda: self.stop_event.is_set()
+
+ session = self.session_maker()
+
+ try:
+ for tweet in stream:
+ get_logger().debug("tweet " + repr(tweet))
+ source = TweetSource(original_json=tweet)
+ get_logger().debug("source created")
+ add_retries = 0
+ while add_retries < 10:
+ try:
+ add_retries += 1
+ session.add(source)
+ session.flush()
+ break
+ except OperationalError as e:
+ session.rollback()
+ get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
+ if add_retries==10:
+ raise e
+
+ source_id = source.id
+ get_logger().debug("before queue + source id " + repr(source_id))
+ self.queue.put((source_id, tweet), False)
+ #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
+ get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+ session.commit()
+# if duration >= 0 and datetime.datetime.utcnow() >= end_ts:
+# print "Stop recording after %d seconds." % (duration)
+# break
+ except Exception as e:
+ get_logger().error("Error when processing tweet " + repr(e))
+ finally:
+ session.rollback()
+ stream.close()
+ session.close()
+ self.queue.close()
+ self.stop_event.set()
+
+
+def process_tweet(tweet, source_id, session, token_filename):
+ try:
+ tweet_obj = anyjson.deserialize(tweet)
+ screen_name = ""
+ if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
+ screen_name = tweet_obj['user']['screen_name']
+ get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
+ get_logger().debug(u"Process_tweet :" + repr(tweet))
+ processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename)
+ processor.process()
+ except Exception as e:
+ message = u"Error %s processing tweet %s" % (repr(e), tweet)
+ get_logger().error(message)
+ output = StringIO.StringIO()
+ traceback.print_exception(Exception, e, None, None, output)
+ error_stack = output.getvalue()
+ output.close()
+ session.rollback()
+ tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
+ session.add(tweet_log)
+ session.commit()
+
- stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects)
- try:
- for tweet in stream:
- if duration >= 0 and datetime.datetime.utcnow() >= end_ts:
- print "Stop recording after %d seconds." % (duration)
- break
- process_tweet(tweet, session, debug, token_filename)
- logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime)))
- session.commit()
- finally:
- stream.close()
+
+class TweetProcess(Process):
+
+ def __init__(self, session_maker, queue, debug, token_filename, stop_event):
+ self.session_maker = session_maker
+ self.queue = queue
+ self.debug = debug
+ self.stop_event = stop_event
+ self.token_filename = token_filename
+ super(TweetProcess, self).__init__()
+
+
+ def run(self):
+
+ session = self.session_maker()
+ try:
+ while not self.stop_event.is_set():
+ try:
+ source_id, tweet_txt = queue.get(True, 10)
+ get_logger().debug("Processing source id " + repr(source_id))
+ except Exception as e:
+ get_logger().debug('Process tweet exception in loop : ' + repr(e))
+ continue
+ process_tweet(tweet_txt, source_id, session, self.token_filename)
+ session.commit()
+ except:
+ raise
+ finally:
+ session.rollback()
+ self.stop_event.set()
+ session.close()
+
+def process_leftovers(session, token_filename):
+
+ sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
+
+ for src in sources:
+ tweet_txt = src.original_json
+ process_tweet(tweet_txt, src.id, session, token_filename)
+
+
+
+ #get tweet source that do not match any message
+ #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
+
def get_options():
parser = OptionParser()
@@ -130,6 +231,9 @@
help="Token file name")
parser.add_option("-d", "--duration", dest="duration",
help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
+ parser.add_option("-N", "--consumer", dest="consumer_nb",
+ help="number of consumer", metavar="CONSUMER", default=1, type='int')
+
utils.set_logging_options(parser)
@@ -139,27 +243,91 @@
if __name__ == '__main__':
-
(options, args) = get_options()
- utils.set_logging(options)
+ utils.set_logging(options, get_logger())
if options.debug:
print "OPTIONS : "
print repr(options)
if options.new and os.path.exists(options.filename):
- os.remove(options.filename)
+ i = 1
+ basename, extension = os.path.splitext(options.filename)
+ new_path = '%s.%d%s' % (basename, i, extension)
+ while i < 1000000 and os.path.exists(new_path):
+ i += 1
+ new_path = '%s.%d%s' % (basename, i, extension)
+ if i >= 1000000:
+ raise Exception("Unable to find new filename for " + options.filename)
+ else:
+ shutil.move(options.filename, new_path)
+
- engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug>=2))
- Session = sessionmaker(bind=engine)
- session = Session()
+ queue = JoinableQueue()
+ stop_event = Event()
+
+ if options.username and options.password:
+ auth = tweepy.auth.BasicAuthHandler(options.username, options.password)
+ else:
+ consumer_key = models.CONSUMER_KEY
+ consumer_secret = models.CONSUMER_SECRET
+ auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
+ auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))
+
- try:
- try:
- main(options.username, options.password, options.track, session, options.debug, options.reconnects, options.token_filename, options.duration)
- except KeyboardInterrupt:
- print '\nGoodbye!'
- session.commit()
- finally:
- session.close()
+ engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
+ Session = sessionmaker(bind=engine)
+
+ session = Session()
+ process_leftovers(session, options.token_filename)
+ session.commit()
+ session.close()
+
+ sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)
+
+ tweet_processes = []
+
+ for i in range(options.consumer_nb):
+ engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
+ Session = sessionmaker(bind=engine)
+ cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
+ tweet_processes.append(cprocess)
+
+ def interupt_handler(signum, frame):
+ stop_event.set()
+
+ signal.signal(signal.SIGINT, interupt_handler)
+
+ sprocess.start()
+ for cprocess in tweet_processes:
+ cprocess.start()
+
+ if options.duration >= 0:
+ end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
+
+ while not stop_event.is_set():
+ if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts:
+ stop_event.set()
+ break
+ if sprocess.is_alive():
+ time.sleep(0.1)
+ else:
+ break
+
+ get_logger().debug("Joining Source Process")
+ sprocess.join()
+ get_logger().debug("Joining Queue")
+ #queue.join()
+ for i,cprocess in enumerate(tweet_processes):
+ get_logger().debug("Joining consumer process Nb %d" % (i+1))
+ cprocess.join()
+
+ get_logger().debug("Processing leftovers")
+ session = Session()
+ process_leftovers(session, options.token_filename)
+ session.commit()
+ session.close()
+
+ get_logger().debug("Done. Exiting.")
+
\ No newline at end of file