# HG changeset patch # User Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com> # Date 1312888043 -7200 # Node ID e9335ee3cf7177bbc11f0685e4833a5fad227905 # Parent 2ebf22c651683751abe7760e39ff4183a06dfd5a# Parent 4c953ca2aa1dd885462e4b2e2bd70ea14e817f01 Merge with 9f24acbe66fb87308778a41a18d19bb918bdb50f diff -r 2ebf22c65168 -r e9335ee3cf71 .project --- a/.project Wed Jul 27 18:32:56 2011 +0200 +++ b/.project Tue Aug 09 13:07:23 2011 +0200 @@ -25,4 +25,15 @@ org.python.pydev.pythonNature org.eclipse.wst.jsdt.core.jsNature + + + 1312812919641 + + 6 + + org.eclipse.ui.ide.multiFilter + 1.0-name-matches-false-false-.DS_Store + + + diff -r 2ebf22c65168 -r e9335ee3cf71 script/lib/iri_tweet/export_tweet_db.py --- a/script/lib/iri_tweet/export_tweet_db.py Wed Jul 27 18:32:56 2011 +0200 +++ b/script/lib/iri_tweet/export_tweet_db.py Tue Aug 09 13:07:23 2011 +0200 @@ -1,8 +1,7 @@ from models import setup_database from optparse import OptionParser #@UnresolvedImport from sqlalchemy.orm import sessionmaker -from utils import set_logging_options, set_logging, TwitterProcessor -import logging +from utils import set_logging_options, set_logging, TwitterProcessor, logger import sqlite3 #@UnresolvedImport @@ -34,11 +33,11 @@ curs_in = conn_in.cursor() fields_mapping = {} for i,res in enumerate(curs_in.execute("select json from tweet_tweet;")): - logging.debug("main loop %d : %s" % (i, res[0])) #@UndefinedVariable - processor = TwitterProcessor(eval(res[0]), res[0], session, options.token_filename) + logger.debug("main loop %d : %s" % (i, res[0])) #@UndefinedVariable + processor = TwitterProcessor(eval(res[0]), res[0], None, session, options.token_filename) processor.process() session.commit() - logging.debug("main : %d tweet processed" % (i+1)) #@UndefinedVariable + logger.debug("main : %d tweet processed" % (i+1)) #@UndefinedVariable except Exception, e: session.rollback() raise e diff -r 2ebf22c65168 -r e9335ee3cf71 script/lib/iri_tweet/export_twitter_alchemy.py --- a/script/lib/iri_tweet/export_twitter_alchemy.py Wed Jul 27 18:32:56 2011 +0200 +++ b/script/lib/iri_tweet/export_twitter_alchemy.py Tue Aug 09 13:07:23 2011 +0200 @@ -5,10 +5,9 @@ from optparse import OptionParser #@UnresolvedImport from sqlalchemy import Table, Column, BigInteger, MetaData from sqlalchemy.orm import sessionmaker -from utils import parse_date, set_logging_options, set_logging, get_filter_query +from utils import parse_date, set_logging_options, set_logging, get_filter_query, logger from models import setup_database import datetime -import logging import os.path import re import sys @@ -101,7 +100,7 @@ set_logging(options) - logging.debug("OPTIONS : " + repr(options)) #@UndefinedVariable + logger.debug("OPTIONS : " + repr(options)) #@UndefinedVariable if len(sys.argv) == 1 or options.database is None: parser.print_help() @@ -159,7 +158,7 @@ for params in parameters: - logging.debug("PARAMETERS " + repr(params)) #@UndefinedVariable + logger.debug("PARAMETERS " + repr(params)) #@UndefinedVariable start_date_str = params.get("start_date",None) end_date_str = params.get("end_date", None) @@ -192,12 +191,12 @@ if content_file and content_file.find("http") == 0: - logging.debug("url : " + content_file) #@UndefinedVariable + logger.debug("url : " + content_file) #@UndefinedVariable h = httplib2.Http() resp, content = h.request(content_file) - logging.debug("url response " + repr(resp) + " content " + repr(content)) #@UndefinedVariable + logger.debug("url response " + repr(resp) + " content " + repr(content)) #@UndefinedVariable project = anyjson.deserialize(content) root = etree.fromstring(project["ldt"]) @@ -254,7 +253,7 @@ if ensemble_parent is None: - logging.error("Can not process file") #@UndefinedVariable + logger.error("Can not process file") #@UndefinedVariable sys.exit() if options.replace: @@ -309,18 +308,18 @@ project["ldt"] = output_data body = anyjson.serialize(project) - logging.debug("write http " + content_file) #@UndefinedVariable - logging.debug("write http " + repr(body)) #@UndefinedVariable + logger.debug("write http " + content_file) #@UndefinedVariable + logger.debug("write http " + repr(body)) #@UndefinedVariable h = httplib2.Http() resp, content = h.request(content_file, "PUT", headers={'content-type':'application/json'}, body=body) - logging.debug("write http " + repr(resp) + " content " + content) #@UndefinedVariable + logger.debug("write http " + repr(resp) + " content " + content) #@UndefinedVariable else: if content_file and os.path.exists(content_file): dest_file_name = content_file else: dest_file_name = options.filename - logging.debug("WRITE : " + dest_file_name) #@UndefinedVariable + logger.debug("WRITE : " + dest_file_name) #@UndefinedVariable output = open(dest_file_name, "w") output.write(output_data) output.flush() diff -r 2ebf22c65168 -r e9335ee3cf71 script/lib/iri_tweet/models.py --- a/script/lib/iri_tweet/models.py Wed Jul 27 18:32:56 2011 +0200 +++ b/script/lib/iri_tweet/models.py Tue Aug 09 13:07:23 2011 +0200 @@ -42,7 +42,34 @@ if hasattr(self,key): setattr(self,key,value) +class TweetSource(Base): + __tablename__ = 'tweet_tweet_source' + id = Column(Integer, primary_key = True, autoincrement=True) + original_json = Column(String) + received_at = Column(DateTime, default=datetime.datetime.now()) + + def __init__(self, **kwargs): + for key, value in kwargs.items(): + if hasattr(self,key): + setattr(self,key,value) + +class TweetLog(Base): + + TWEET_STATUS = { + 'OK' : 1, + 'ERROR' : 2, + } + + __tablename__ = 'tweet_tweet_log' + id = Column(Integer, primary_key = True, autoincrement=True) + tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id')) + tweet_source = relationship("TweetSource", backref="logs") + status = Column(Integer) + error = Column(String) + error_stack = Column(String) + + class Tweet(Base): __tablename__ = 'tweet_tweet' @@ -65,12 +92,12 @@ text = Column(String) truncated = Column(Boolean) user_id = Column(Integer, ForeignKey('tweet_user.id')) - original_json = Column(String) + user = relationship("User", backref="tweets") + tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id')) + tweet_source = relationship("TweetSource", backref="tweet") entity_list = relationship(Entity, backref='tweet') received_at = Column(DateTime, default=datetime.datetime.now()) - - #user = relationship(User, primaryjoin=user_id == User.id) - + def __init__(self, **kwargs): for key, value in kwargs.items(): if hasattr(self,key): @@ -81,11 +108,11 @@ id = Column(Integer, primary_key = True) user_id = Column(Integer, ForeignKey('tweet_user.id')) + user = relationship("User", backref="messages") created_at = Column(DateTime, default=datetime.datetime.now()) message_id = Column(Integer, ForeignKey('tweet_message.id')) class Message(Base): - __tablename__ = "tweet_message" id = Column(Integer, primary_key = True) @@ -131,8 +158,6 @@ url= Column(String) utc_offset = Column(Integer) verified= Column(Boolean) - tweets = relationship(Tweet, backref='user') - messages = relationship(UserMessage, backref='user') def __init__(self, **kwargs): for key, value in kwargs.items(): diff -r 2ebf22c65168 -r e9335ee3cf71 script/lib/iri_tweet/tests.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/iri_tweet/tests.py Tue Aug 09 13:07:23 2011 +0200 @@ -0,0 +1,148 @@ +from sqlalchemy import Column, Integer, String, ForeignKey, create_engine +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relationship, backref +import unittest #@UnresolvedImport +from sqlalchemy.orm import sessionmaker +from iri_tweet.utils import ObjectsBuffer, TwitterProcessor +from iri_tweet import models +import tempfile #@UnresolvedImport +import os + +Base = declarative_base() + +class User(Base): + __tablename__ = 'users' + + id = Column(Integer, primary_key=True) + name = Column(String) + fullname = Column(String) + password = Column(String) + + def __init__(self, name, fullname, password): + self.name = name + self.fullname = fullname + self.password = password + + def __repr__(self): + return "" % (self.name, self.fullname, self.password) + + +class Address(Base): + __tablename__ = 'addresses' + id = Column(Integer, primary_key=True) + email_address = Column(String, nullable=False) + user_id = Column(Integer, ForeignKey('users.id')) + + user = relationship("User", backref=backref('addresses', order_by=id)) + + def __init__(self, user_id, email_address): + self.email_address = email_address + self.user_id = user_id + + def __repr__(self): + return "" % self.email_address + + + +class TestObjectBuffer(unittest.TestCase): + + def setUp(self): + self.engine = create_engine('sqlite:///:memory:', echo=False) + Base.metadata.create_all(self.engine) + sessionMaker = sessionmaker(bind=self.engine) + self.session = sessionMaker() + + def tearDown(self): + self.session.close() + self.engine.dispose() + + + def testCreateUser(self): + ed_user = User('ed', 'Ed Jones', 'edspassword') + self.session.add(ed_user) + self.assertTrue(ed_user.id is None) + self.session.commit() + self.assertTrue(ed_user.id is not None) + + + def testSimpleBuffer(self): + obj_buffer = ObjectsBuffer() + obj_proxy = obj_buffer.add_object(User, ['ed1', 'Ed1 Jones', 'edspassword'], None, False) + self.assertTrue(obj_proxy.id() is None) + obj_buffer.persists(self.session) + self.assertTrue(obj_proxy.id() is None) + self.session.commit() + self.assertTrue(obj_proxy.id() is not None) + + + def testSimpleBufferKwargs(self): + obj_buffer = ObjectsBuffer() + obj_proxy = obj_buffer.add_object(User, None, {'name':'ed1b', 'fullname':'Ed1b Jones', 'password':'edspassword'}, False) + self.assertTrue(obj_proxy.id() is None) + obj_buffer.persists(self.session) + self.assertTrue(obj_proxy.id() is None) + self.session.commit() + self.assertTrue(obj_proxy.id() is not None) + + + def testSimpleBufferFlush(self): + obj_buffer = ObjectsBuffer() + obj_proxy = obj_buffer.add_object(User, ['ed2', 'Ed2 Jones', 'edspassword'], None, True) + self.assertTrue(obj_proxy.id() is None) + obj_buffer.persists(self.session) + self.assertTrue(obj_proxy.id() is not None) + self.session.commit() + self.assertTrue(obj_proxy.id() is not None) + + def testRelationBuffer(self): + obj_buffer = ObjectsBuffer() + user1_proxy = obj_buffer.add_object(User, ['ed3', 'Ed3 Jones', 'edspassword'], None, True) + obj_buffer.add_object(Address, [user1_proxy.id,'ed3@mail.com'], None, False) + obj_buffer.add_object(Address, [user1_proxy.id,'ed3@other.com'], None, False) + user2_proxy = obj_buffer.add_object(User, ['ed4', 'Ed3 Jones', 'edspassword'], None, True) + obj_buffer.add_object(Address, [user2_proxy.id,'ed4@mail.com'], None, False) + obj_buffer.persists(self.session) + self.session.commit() + ed_user = self.session.query(User).filter_by(name='ed3').first() + self.assertEquals(2, len(ed_user.addresses)) + ed_user = self.session.query(User).filter_by(name='ed4').first() + self.assertEquals(1, len(ed_user.addresses)) + + + +original_json = u'{"in_reply_to_user_id_str":null,"text":"RT @BieberEagle: \\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweeted_status":{"in_reply_to_user_id_str":null,"text":"\\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweet_count":"100+","source":"web","entities":{"user_mentions":[],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96638597737889792","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/298443445\/355584171.jpg","listed_count":5040,"friends_count":8477,"profile_link_color":"ff0000","profile_sidebar_border_color":"000000","url":"http:\/\/twitpic.com\/photos\/BieberEagle","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","profile_image_url":"http:\/\/a2.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","description":"1 name, 1 inspiration, 1 hero, 1 smile, 1 singer, 1 boy who changed my life. B.\u0130.E.B.E.R-Believe In Everything Because Everything\'s Reachable. #NEVERSAYNEVER","default_profile":false,"notifications":null,"time_zone":"Paris","followers_count":14506,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"BieberEagle","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ffffff","location":"\u2665 Albania \u2665 ","id_str":"229067923","profile_background_image_url":"http:\/\/a2.twimg.com\/profile_background_images\/298443445\/355584171.jpg","favourites_count":89,"protected":false,"follow_request_sent":null,"following":null,"name":"truebelieber","statuses_count":24279,"verified":false,"created_at":"Tue Dec 21 12:35:18 +0000 2010","profile_text_color":"000000","id":229067923,"contributors_enabled":false,"utc_offset":3600,"profile_sidebar_fill_color":""},"id":96638597737889792,"created_at":"Thu Jul 28 17:50:11 +0000 2011","geo":null,"in_reply_to_screen_name":null},"retweet_count":"100+","source":"web","entities":{"user_mentions":[{"indices":[3,15],"screen_name":"BieberEagle","id_str":"229067923","name":"truebelieber","id":229067923}],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96965037637382145","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","listed_count":3,"friends_count":1150,"profile_link_color":"00cccc","profile_sidebar_border_color":"c8ff00","url":"http:\/\/www.facebook.com\/blovedbecca180","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","profile_image_url":"http:\/\/a3.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","description":"if ya wanna know something about me, then get to know me. \\n\\r\\n\\ri promise, you wont regret it. (:\\r\\ni love justin bieber with an extreme burning passion!","default_profile":false,"notifications":null,"time_zone":"Central Time (US & Canada)","followers_count":361,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"beccaxannxx","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ff0066","location":"","id_str":"65624607","profile_background_image_url":"http:\/\/a3.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","favourites_count":266,"protected":false,"follow_request_sent":null,"following":null,"name":"beccaxannxx","statuses_count":2512,"verified":false,"created_at":"Fri Aug 14 12:36:35 +0000 2009","profile_text_color":"6a39d4","id":65624607,"contributors_enabled":false,"utc_offset":-21600,"profile_sidebar_fill_color":"ff00bb"},"id":96965037637382145,"created_at":"Fri Jul 29 15:27:21 +0000 2011","geo":null,"in_reply_to_screen_name":null}' + + +class TestTwitterProcessor(unittest.TestCase): + + def setUp(self): + self.engine, self.metadata = models.setup_database('sqlite:///:memory:', echo=True) + sessionMaker = sessionmaker(bind=self.engine) + self.session = sessionMaker() + file, self.tmpfilepath = tempfile.mkstemp() + os.close(file) + + + def testTwitterProcessor(self): + tp = TwitterProcessor(None, original_json, None, self.session, self.tmpfilepath) + tp.process() + self.session.commit() + + self.assertEquals(1, self.session.query(models.TweetSource).count()) + self.assertEquals(1, self.session.query(models.Tweet).count()) + self.assertEquals(2, self.session.query(models.User).count()) + tweet = self.session.query(models.Tweet).first() + self.assertFalse(tweet.user is None) + self.assertEqual(u"beccaxannxx",tweet.user.name) + self.assertEqual(65624607,tweet.user.id) + self.assertEqual(1,len(tweet.entity_list)) + self.assertEqual(u"BieberEagle", tweet.entity_list[0].user.screen_name) + + + def tearDown(self): + self.session.close() + self.engine.dispose() + os.remove(self.tmpfilepath) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff -r 2ebf22c65168 -r e9335ee3cf71 script/lib/iri_tweet/tweet_twitter_user.py --- a/script/lib/iri_tweet/tweet_twitter_user.py Wed Jul 27 18:32:56 2011 +0200 +++ b/script/lib/iri_tweet/tweet_twitter_user.py Tue Aug 09 13:07:23 2011 +0200 @@ -1,13 +1,12 @@ from iri_tweet.models import setup_database, Message, UserMessage, User from iri_tweet.utils import (get_oauth_token, get_user_query, set_logging_options, - set_logging, parse_date) + set_logging, parse_date, logger) from optparse import OptionParser #@UnresolvedImport from sqlalchemy import BigInteger from sqlalchemy.orm import sessionmaker from sqlalchemy.schema import MetaData, Table, Column from sqlalchemy.sql import and_ import datetime -import logging #@UnresolvedImport import sys import time import twitter @@ -54,7 +53,7 @@ set_logging(options) - logging.debug("OPTIONS : " + repr(options)) #@UndefinedVariable + logger.debug("OPTIONS : " + repr(options)) #@UndefinedVariable if not options.message or len(options.message) == 0: sys.exit() @@ -108,7 +107,7 @@ screen_name = user.screen_name message = u"@%s: %s" % (screen_name, base_message) - logging.debug("new status : " + message) #@UndefinedVariable + logger.debug("new status : " + message) #@UndefinedVariable if not options.simulate: t.statuses.update(status=message) user_message = UserMessage(user_id=user.id, message_id=message_obj.id) diff -r 2ebf22c65168 -r e9335ee3cf71 script/lib/iri_tweet/utils.py --- a/script/lib/iri_tweet/utils.py Wed Jul 27 18:32:56 2011 +0200 +++ b/script/lib/iri_tweet/utils.py Tue Aug 09 13:07:23 2011 +0200 @@ -1,6 +1,6 @@ -from models import Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, \ - EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, \ - ACCESS_TOKEN_SECRET, adapt_date, adapt_json +from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, + EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, + ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog) from sqlalchemy.sql import select, or_ #@UnresolvedImport import anyjson #@UnresolvedImport import datetime @@ -77,13 +77,67 @@ return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) +class ObjectBufferProxy(object): + def __init__(self, klass, args, kwargs, must_flush, instance=None): + self.klass= klass + self.args = args + self.kwargs = kwargs + self.must_flush = must_flush + self.instance = instance + + def persists(self, session): + new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else [] + new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {} + + self.instance = self.klass(*new_args, **new_kwargs) + session.add(self.instance) + if self.must_flush: + session.flush() + + def __getattr__(self, name): + return lambda : getattr(self.instance, name) if self.instance else None + + + + +class ObjectsBuffer(object): + + def __init__(self): + self.__bufferlist = [] + + def persists(self, session): + for object_proxy in self.__bufferlist: + object_proxy.persists(session) + + def add_object(self, klass, args, kwargs, must_flush): + new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush) + self.__bufferlist.append(new_proxy) + return new_proxy + + def get(self, klass, **kwargs): + for proxy in self.__bufferlist: + if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass: + continue + found = True + for k,v in kwargs.items(): + if (k not in proxy.kwargs) or v != proxy.kwargs[k]: + found = False + break + if found: + return proxy + + return None + + + + class TwitterProcessorException(Exception): pass class TwitterProcessor(object): - def __init__(self, json_dict, json_txt, session, token_filename=None): + def __init__(self, json_dict, json_txt, source_id, session, token_filename=None): if json_dict is None and json_txt is None: raise TwitterProcessorException("No json") @@ -101,24 +155,39 @@ if "id" not in self.json_dict: raise TwitterProcessorException("No id in json") + self.source_id = source_id self.session = session self.token_filename = token_filename + self.obj_buffer = ObjectsBuffer() + def __get_user(self, user_dict): - logging.debug("Get user : " + repr(user_dict)) #@UndefinedVariable + logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable user_id = user_dict.get("id",None) user_name = user_dict.get("screen_name", user_dict.get("name", None)) if user_id is None and user_name is None: return None - + + user = None if user_id: - user = self.session.query(User).filter(User.id == user_id).first() + user = self.obj_buffer.get(User, id=user_id) else: - user = self.session.query(User).filter(User.screen_name.ilike(user_name)).first() + user = self.obj_buffer.get(User, screen_name=user_name) + + if user is not None: + return user + + #todo : add methpds to objectbuffer to get buffer user + user_obj = None + if user_id: + user_obj = self.session.query(User).filter(User.id == user_id).first() + else: + user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first() - if user is not None: + if user_obj is not None: + user = ObjectBufferProxy(User, None, None, False, user_obj) return user user_created_at = user_dict.get("created_at", None) @@ -132,28 +201,27 @@ else: user_dict = t.users.show(screen_name=user_name) except Exception as e: - logging.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable - logging.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable + logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable + logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable return None user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) if "id" not in user_dict: return None + #TODO filter get, wrap in proxy user = self.session.query(User).filter(User.id == user_dict["id"]).first() if user is not None: return user - user = User(**user_dict) + user = self.obj_buffer.add_object(User, None, user_dict, True) - self.session.add(user) - self.session.flush() - - return user + return user + def __process_entity(self, ind, ind_type): - logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable + logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable ind = clean_keys(ind) @@ -161,57 +229,53 @@ "indice_start": ind["indices"][0], "indice_end" : ind["indices"][1], "tweet_id" : self.tweet.id, - "tweet" : self.tweet } def process_hashtags(): text = ind.get("text", ind.get("hashtag", None)) if text is None: - return None - hashtag = self.session.query(Hashtag).filter(Hashtag.text.ilike(text)).first() + return None + hashtag = self.obj_buffer.get(Hashtag, text=text) + if hashtag is None: + hashtag_obj = self.session.query(Hashtag).filter(Hashtag.text.ilike(text)).first() + if hashtag_obj is not None: + hashtag = ObjectBufferProxy(Hashtag, None, None, False, hashtag_obj) + if hashtag is None: ind["text"] = text - hashtag = Hashtag(**ind) - self.session.add(hashtag) - self.session.flush() - entity_dict['hashtag'] = hashtag + hashtag = self.obj_buffer.add_object(Hashtag, None, ind, True) entity_dict['hashtag_id'] = hashtag.id - entity = EntityHashtag(**entity_dict) - return entity + return EntityHashtag, entity_dict def process_user_mentions(): user_mention = self.__get_user(ind) if user_mention is None: - entity_dict['user'] = None entity_dict['user_id'] = None else: - entity_dict['user'] = user_mention entity_dict['user_id'] = user_mention.id - entity = EntityUser(**entity_dict) - return entity + return EntityUser, entity_dict def process_urls(): - url = self.session.query(Url).filter(Url.url == ind["url"]).first() + url = self.obj_buffer.get(Url, url=ind["url"]) if url is None: - url = Url(**ind) - self.session.add(url) - self.session.flush() - entity_dict['url'] = url + url_obj = self.session.query(Url).filter(Url.url == ind["url"]).first() + if url_obj is not None: + url = ObjectBufferProxy(Url, None, None, False, url_obj) + if url is None: + url = self.obj_buffer.add_object(Url, None, ind, True) entity_dict['url_id'] = url.id - entity = EntityUrl(**entity_dict) - return entity + return EntityUrl, entity_dict #{'': lambda } - entity = { + entity_klass, entity_dict = { 'hashtags': process_hashtags, 'user_mentions' : process_user_mentions, 'urls' : process_urls }[ind_type]() - logging.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable - if entity: - self.session.add(entity) - self.session.flush() + logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable + if entity_klass: + self.obj_buffer.add_object(entity_klass, None, entity_dict, False) def __process_twitter_stream(self): @@ -225,16 +289,15 @@ # get or create user user = self.__get_user(self.json_dict["user"]) if user is None: - logging.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable - ts_copy["user"] = None + logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable ts_copy["user_id"] = None else: - ts_copy["user"] = user - ts_copy["user_id"] = ts_copy["user"].id - ts_copy["original_json"] = self.json_txt + ts_copy["user_id"] = user.id + + del(ts_copy['user']) + ts_copy["tweet_source_id"] = self.source_id - self.tweet = Tweet(**ts_copy) - self.session.add(self.tweet) + self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True) # get entities if "entities" in self.json_dict: @@ -260,7 +323,8 @@ tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() if tweet_nb > 0: return - + + tweet_fields = { 'created_at': self.json_dict["created_at"], 'favorited': False, @@ -272,8 +336,8 @@ #'place': ts["place"], 'source': self.json_dict["source"], 'text': self.json_dict["text"], - 'truncated': False, - 'original_json' : self.json_txt, + 'truncated': False, + 'tweet_source_id' : self.source_id, } #user @@ -286,16 +350,13 @@ user = self.__get_user(user_fields) if user is None: - logging.warning("USER not found " + repr(user_fields)) #@UndefinedVariable - tweet_fields["user"] = None + logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable tweet_fields["user_id"] = None else: - tweet_fields["user"] = user tweet_fields["user_id"] = user.id tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) - self.tweet = Tweet(**tweet_fields) - self.session.add(self.tweet) + self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True) text = self.tweet.text @@ -303,26 +364,37 @@ for ind in extractor.extract_hashtags_with_indices(): self.__process_entity(ind, "hashtags") - - for ind in extractor.extract_mentioned_screen_names_with_indices(): - self.__process_entity(ind, "user_mentions") - + for ind in extractor.extract_urls_with_indices(): self.__process_entity(ind, "urls") - self.session.flush() + for ind in extractor.extract_mentioned_screen_names_with_indices(): + self.__process_entity(ind, "user_mentions") + def process(self): + + if self.source_id is None: + tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True) + self.source_id = tweet_source.id + if "metadata" in self.json_dict: self.__process_twitter_rest() else: self.__process_twitter_stream() + + self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, False) + self.obj_buffer.persists(self.session) + -def set_logging(options): +def set_logging(options, plogger=None): - logging_config = {} + logging_config = { + "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s', + "level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable + } if options.logfile == "stdout": logging_config["stream"] = sys.stdout @@ -330,9 +402,27 @@ logging_config["stream"] = sys.stderr else: logging_config["filename"] = options.logfile - - logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) #@UndefinedVariable - logging.basicConfig(**logging_config) #@UndefinedVariable + + logger = plogger + if logger is None: + logger = logging.getLogger() #@UndefinedVariable + + if len(logger.handlers) == 0: + filename = logging_config.get("filename") + if filename: + mode = logging_config.get("filemode", 'a') + hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable + else: + stream = logging_config.get("stream") + hdlr = logging.StreamHandler(stream) #@UndefinedVariable + fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable + dfs = logging_config.get("datefmt", None) + fmt = logging.Formatter(fs, dfs) #@UndefinedVariable + hdlr.setFormatter(fmt) + logger.addHandler(hdlr) + level = logging_config.get("level") + if level is not None: + logger.setLevel(level) options.debug = (options.verbose-options.quiet > 0) @@ -387,4 +477,4 @@ return query.distinct() - +logger = logging.getLogger() #@UndefinedVariable diff -r 2ebf22c65168 -r e9335ee3cf71 script/lib/tweetstream/setup.py --- a/script/lib/tweetstream/setup.py Wed Jul 27 18:32:56 2011 +0200 +++ b/script/lib/tweetstream/setup.py Tue Aug 09 13:07:23 2011 +0200 @@ -1,3 +1,4 @@ +#@PydevCodeAnalysisIgnore from setuptools import setup, find_packages import sys, os diff -r 2ebf22c65168 -r e9335ee3cf71 script/lib/tweetstream/tweetstream/streamclasses.py --- a/script/lib/tweetstream/tweetstream/streamclasses.py Wed Jul 27 18:32:56 2011 +0200 +++ b/script/lib/tweetstream/tweetstream/streamclasses.py Tue Aug 09 13:07:23 2011 +0200 @@ -54,7 +54,7 @@ :attr: `USER_AGENT`. """ - def __init__(self, auth, catchup=None, url=None): + def __init__(self, auth, catchup=None, url=None, as_text=False): self._conn = None self._rate_ts = None self._rate_cnt = 0 @@ -68,6 +68,7 @@ self.rate = 0 self.user_agent = USER_AGENT if url: self.url = url + self._as_text = as_text self.muststop = False @@ -119,12 +120,18 @@ this method and return post data. The data should be in the format returned by urllib.urlencode.""" return None + + def __muststop(self): + if callable(self.muststop): + return self.muststop() + else: + return self.muststop def next(self): """Return the next available tweet. This call is blocking!""" while True: try: - if self.muststop: + if self.__muststop(): raise StopIteration() if not self.connected: @@ -143,10 +150,15 @@ elif data.isspace(): continue - data = anyjson.deserialize(data) - if 'text' in data: + if not self._as_text: + data = anyjson.deserialize(data) + if 'text' in data: + self.count += 1 + self._rate_cnt += 1 + else: # count and rate may be off, but we count everything self.count += 1 self._rate_cnt += 1 + return data except ValueError, e: @@ -175,12 +187,12 @@ url = "http://stream.twitter.com/1/statuses/filter.json" def __init__(self, auth, follow=None, locations=None, - track=None, catchup=None, url=None): + track=None, catchup=None, url=None, as_text=False): self._follow = follow self._locations = locations self._track = track # remove follow, locations, track - BaseStream.__init__(self, auth, url=url) + BaseStream.__init__(self, auth, url=url, as_text=as_text) def _get_post_data(self): postdata = {} diff -r 2ebf22c65168 -r e9335ee3cf71 script/rest/search_twitter.py --- a/script/rest/search_twitter.py Wed Jul 27 18:32:56 2011 +0200 +++ b/script/rest/search_twitter.py Tue Aug 09 13:07:23 2011 +0200 @@ -49,12 +49,12 @@ page = 1 while page <= int(1500/int(options.rpp)) and ( results is None or len(results) > 0): - results = twitter. search(q=options.query, rpp=options.rpp, page=page) + results = twitter.search(q=options.query, rpp=options.rpp, page=page) for tweet in results["results"]: print tweet tweet_str = anyjson.serialize(tweet) #invalidate user id - processor = utils.TwitterProcessor(tweet, tweet_str, session, options.token_filename) + processor = utils.TwitterProcessor(tweet, tweet_str, None, session, options.token_filename) processor.process() session.flush() session.commit() diff -r 2ebf22c65168 -r e9335ee3cf71 script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Wed Jul 27 18:32:56 2011 +0200 +++ b/script/stream/recorder_tweetstream.py Tue Aug 09 13:07:23 2011 +0200 @@ -1,16 +1,24 @@ from getpass import getpass from iri_tweet import models, utils +from iri_tweet.models import TweetSource, TweetLog +from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger from optparse import OptionParser from sqlalchemy.orm import sessionmaker -from sqlite3 import * +import StringIO +import logging +import anyjson import datetime -import logging import os +import shutil +import signal import socket import sys import time +import traceback +import tweepy.auth import tweetstream -import tweepy.auth +from iri_tweet.utils import logger +from sqlalchemy.exc import OperationalError socket._fileobject.default_bufsize = 0 @@ -44,12 +52,12 @@ """ - def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, **kwargs): + def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs): self.max_reconnects = reconnects self.retry_wait = retry_wait self._reconnects = 0 self._error_cb = error_cb - super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, **kwargs) + super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs) def next(self): while True: @@ -72,45 +80,138 @@ -def process_tweet(tweet, session, debug, token_filename): - screen_name = "" - if 'user' in tweet and 'screen_name' in tweet['user']: - screen_name = tweet['user']['screen_name'] - logging.info("Process_tweet from %s : %s" % (screen_name, tweet['text'])) - logging.debug("Process_tweet :" + repr(tweet)) - processor = utils.TwitterProcessor(tweet, None, session, token_filename) - processor.process() - -def main(username, password, track, session, debug, reconnects, token_filename, duration): - - #username = username or raw_input('Twitter username: ') - #password = password or getpass('Twitter password: ') - - track_list = track or raw_input('Keywords to track (comma seperated): ').strip() - track_list = [k for k in track_list.split(',')] +class SourceProcess(Process): + + def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event): + self.session_maker = session_maker + self.queue = queue + self.auth = auth + self.track = track + self.debug = debug + self.reconnects = reconnects + self.token_filename = token_filename + self.stop_event = stop_event + super(SourceProcess, self).__init__() +# self.stop_event = - if username and password: - auth = tweepy.auth.BasicAuthHandler(username, password) - else: - consumer_key = models.CONSUMER_KEY - consumer_secret = models.CONSUMER_SECRET - auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) - auth.set_access_token(*(utils.get_oauth_token(token_filename))) - - if duration >= 0: - end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration) + def run(self): + + get_logger().debug("SourceProcess : run") + track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() + track_list = [k for k in track_list.split(',')] + + get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list)) + stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True) + get_logger().debug("SourceProcess : after connecting to stream") + stream.muststop = lambda: self.stop_event.is_set() + + session = self.session_maker() + + try: + for tweet in stream: + get_logger().debug("tweet " + repr(tweet)) + source = TweetSource(original_json=tweet) + get_logger().debug("source created") + add_retries = 0 + while add_retries < 10: + try: + add_retries += 1 + session.add(source) + session.flush() + break + except OperationalError as e: + session.rollback() + get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries)) + if add_retries==10: + raise e + + source_id = source.id + get_logger().debug("before queue + source id " + repr(source_id)) + self.queue.put((source_id, tweet), False) + #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename) + get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) + session.commit() +# if duration >= 0 and datetime.datetime.utcnow() >= end_ts: +# print "Stop recording after %d seconds." % (duration) +# break + except Exception as e: + get_logger().error("Error when processing tweet " + repr(e)) + finally: + session.rollback() + stream.close() + session.close() + self.queue.close() + self.stop_event.set() + + +def process_tweet(tweet, source_id, session, token_filename): + try: + tweet_obj = anyjson.deserialize(tweet) + screen_name = "" + if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: + screen_name = tweet_obj['user']['screen_name'] + get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) + get_logger().debug(u"Process_tweet :" + repr(tweet)) + processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename) + processor.process() + except Exception as e: + message = u"Error %s processing tweet %s" % (repr(e), tweet) + get_logger().error(message) + output = StringIO.StringIO() + traceback.print_exception(Exception, e, None, None, output) + error_stack = output.getvalue() + output.close() + session.rollback() + tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack) + session.add(tweet_log) + session.commit() + - stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects) - try: - for tweet in stream: - if duration >= 0 and datetime.datetime.utcnow() >= end_ts: - print "Stop recording after %d seconds." % (duration) - break - process_tweet(tweet, session, debug, token_filename) - logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime))) - session.commit() - finally: - stream.close() + +class TweetProcess(Process): + + def __init__(self, session_maker, queue, debug, token_filename, stop_event): + self.session_maker = session_maker + self.queue = queue + self.debug = debug + self.stop_event = stop_event + self.token_filename = token_filename + super(TweetProcess, self).__init__() + + + def run(self): + + session = self.session_maker() + try: + while not self.stop_event.is_set(): + try: + source_id, tweet_txt = queue.get(True, 10) + get_logger().debug("Processing source id " + repr(source_id)) + except Exception as e: + get_logger().debug('Process tweet exception in loop : ' + repr(e)) + continue + process_tweet(tweet_txt, source_id, session, self.token_filename) + session.commit() + except: + raise + finally: + session.rollback() + self.stop_event.set() + session.close() + +def process_leftovers(session, token_filename): + + sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) + + for src in sources: + tweet_txt = src.original_json + process_tweet(tweet_txt, src.id, session, token_filename) + + + + #get tweet source that do not match any message + #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull; + def get_options(): parser = OptionParser() @@ -130,6 +231,9 @@ help="Token file name") parser.add_option("-d", "--duration", dest="duration", help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') + parser.add_option("-N", "--consumer", dest="consumer_nb", + help="number of consumer", metavar="CONSUMER", default=1, type='int') + utils.set_logging_options(parser) @@ -139,27 +243,91 @@ if __name__ == '__main__': - (options, args) = get_options() - utils.set_logging(options) + utils.set_logging(options, get_logger()) if options.debug: print "OPTIONS : " print repr(options) if options.new and os.path.exists(options.filename): - os.remove(options.filename) + i = 1 + basename, extension = os.path.splitext(options.filename) + new_path = '%s.%d%s' % (basename, i, extension) + while i < 1000000 and os.path.exists(new_path): + i += 1 + new_path = '%s.%d%s' % (basename, i, extension) + if i >= 1000000: + raise Exception("Unable to find new filename for " + options.filename) + else: + shutil.move(options.filename, new_path) + - engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug>=2)) - Session = sessionmaker(bind=engine) - session = Session() + queue = JoinableQueue() + stop_event = Event() + + if options.username and options.password: + auth = tweepy.auth.BasicAuthHandler(options.username, options.password) + else: + consumer_key = models.CONSUMER_KEY + consumer_secret = models.CONSUMER_SECRET + auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) + auth.set_access_token(*(utils.get_oauth_token(options.token_filename))) + - try: - try: - main(options.username, options.password, options.track, session, options.debug, options.reconnects, options.token_filename, options.duration) - except KeyboardInterrupt: - print '\nGoodbye!' - session.commit() - finally: - session.close() + engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2)) + Session = sessionmaker(bind=engine) + + session = Session() + process_leftovers(session, options.token_filename) + session.commit() + session.close() + + sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event) + + tweet_processes = [] + + for i in range(options.consumer_nb): + engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2)) + Session = sessionmaker(bind=engine) + cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event) + tweet_processes.append(cprocess) + + def interupt_handler(signum, frame): + stop_event.set() + + signal.signal(signal.SIGINT, interupt_handler) + + sprocess.start() + for cprocess in tweet_processes: + cprocess.start() + + if options.duration >= 0: + end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) + + while not stop_event.is_set(): + if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: + stop_event.set() + break + if sprocess.is_alive(): + time.sleep(0.1) + else: + break + + get_logger().debug("Joining Source Process") + sprocess.join() + get_logger().debug("Joining Queue") + #queue.join() + for i,cprocess in enumerate(tweet_processes): + get_logger().debug("Joining consumer process Nb %d" % (i+1)) + cprocess.join() + + get_logger().debug("Processing leftovers") + session = Session() + process_leftovers(session, options.token_filename) + session.commit() + session.close() + + get_logger().debug("Done. Exiting.") + \ No newline at end of file