# HG changeset patch # User Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com> # Date 1312886439 -7200 # Node ID d4b7d6e2633f205b202285f93d8dbd25d0f60224 # Parent 9213a63fa34a8edcaa905afdd9f709c47694886f migrate twitter processor to use object buffer and add tests diff -r 9213a63fa34a -r d4b7d6e2633f script/lib/iri_tweet/tests.py --- a/script/lib/iri_tweet/tests.py Mon Aug 08 09:01:40 2011 +0200 +++ b/script/lib/iri_tweet/tests.py Tue Aug 09 12:40:39 2011 +0200 @@ -3,7 +3,10 @@ from sqlalchemy.orm import relationship, backref import unittest #@UnresolvedImport from sqlalchemy.orm import sessionmaker -from iri_tweet.utils import ObjectsBuffer +from iri_tweet.utils import ObjectsBuffer, TwitterProcessor +from iri_tweet import models +import tempfile #@UnresolvedImport +import os Base = declarative_base() @@ -48,6 +51,11 @@ Base.metadata.create_all(self.engine) sessionMaker = sessionmaker(bind=self.engine) self.session = sessionMaker() + + def tearDown(self): + self.session.close() + self.engine.dispose() + def testCreateUser(self): ed_user = User('ed', 'Ed Jones', 'edspassword') @@ -55,6 +63,7 @@ self.assertTrue(ed_user.id is None) self.session.commit() self.assertTrue(ed_user.id is not None) + def testSimpleBuffer(self): obj_buffer = ObjectsBuffer() @@ -64,6 +73,17 @@ self.assertTrue(obj_proxy.id() is None) self.session.commit() self.assertTrue(obj_proxy.id() is not None) + + + def testSimpleBufferKwargs(self): + obj_buffer = ObjectsBuffer() + obj_proxy = obj_buffer.add_object(User, None, {'name':'ed1b', 'fullname':'Ed1b Jones', 'password':'edspassword'}, False) + self.assertTrue(obj_proxy.id() is None) + obj_buffer.persists(self.session) + self.assertTrue(obj_proxy.id() is None) + self.session.commit() + self.assertTrue(obj_proxy.id() is not None) + def testSimpleBufferFlush(self): obj_buffer = ObjectsBuffer() @@ -89,9 +109,40 @@ self.assertEquals(1, len(ed_user.addresses)) + +original_json = u'{"in_reply_to_user_id_str":null,"text":"RT @BieberEagle: \\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweeted_status":{"in_reply_to_user_id_str":null,"text":"\\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweet_count":"100+","source":"web","entities":{"user_mentions":[],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96638597737889792","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/298443445\/355584171.jpg","listed_count":5040,"friends_count":8477,"profile_link_color":"ff0000","profile_sidebar_border_color":"000000","url":"http:\/\/twitpic.com\/photos\/BieberEagle","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","profile_image_url":"http:\/\/a2.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","description":"1 name, 1 inspiration, 1 hero, 1 smile, 1 singer, 1 boy who changed my life. B.\u0130.E.B.E.R-Believe In Everything Because Everything\'s Reachable. #NEVERSAYNEVER","default_profile":false,"notifications":null,"time_zone":"Paris","followers_count":14506,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"BieberEagle","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ffffff","location":"\u2665 Albania \u2665 ","id_str":"229067923","profile_background_image_url":"http:\/\/a2.twimg.com\/profile_background_images\/298443445\/355584171.jpg","favourites_count":89,"protected":false,"follow_request_sent":null,"following":null,"name":"truebelieber","statuses_count":24279,"verified":false,"created_at":"Tue Dec 21 12:35:18 +0000 2010","profile_text_color":"000000","id":229067923,"contributors_enabled":false,"utc_offset":3600,"profile_sidebar_fill_color":""},"id":96638597737889792,"created_at":"Thu Jul 28 17:50:11 +0000 2011","geo":null,"in_reply_to_screen_name":null},"retweet_count":"100+","source":"web","entities":{"user_mentions":[{"indices":[3,15],"screen_name":"BieberEagle","id_str":"229067923","name":"truebelieber","id":229067923}],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96965037637382145","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","listed_count":3,"friends_count":1150,"profile_link_color":"00cccc","profile_sidebar_border_color":"c8ff00","url":"http:\/\/www.facebook.com\/blovedbecca180","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","profile_image_url":"http:\/\/a3.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","description":"if ya wanna know something about me, then get to know me. \\n\\r\\n\\ri promise, you wont regret it. (:\\r\\ni love justin bieber with an extreme burning passion!","default_profile":false,"notifications":null,"time_zone":"Central Time (US & Canada)","followers_count":361,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"beccaxannxx","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ff0066","location":"","id_str":"65624607","profile_background_image_url":"http:\/\/a3.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","favourites_count":266,"protected":false,"follow_request_sent":null,"following":null,"name":"beccaxannxx","statuses_count":2512,"verified":false,"created_at":"Fri Aug 14 12:36:35 +0000 2009","profile_text_color":"6a39d4","id":65624607,"contributors_enabled":false,"utc_offset":-21600,"profile_sidebar_fill_color":"ff00bb"},"id":96965037637382145,"created_at":"Fri Jul 29 15:27:21 +0000 2011","geo":null,"in_reply_to_screen_name":null}' + + +class TestTwitterProcessor(unittest.TestCase): + + def setUp(self): + self.engine, self.metadata = models.setup_database('sqlite:///:memory:', echo=True) + sessionMaker = sessionmaker(bind=self.engine) + self.session = sessionMaker() + file, self.tmpfilepath = tempfile.mkstemp() + os.close(file) + + + def testTwitterProcessor(self): + tp = TwitterProcessor(None, original_json, None, self.session, self.tmpfilepath) + tp.process() + self.session.commit() + + self.assertEquals(1, self.session.query(models.TweetSource).count()) + self.assertEquals(1, self.session.query(models.Tweet).count()) + self.assertEquals(2, self.session.query(models.User).count()) + tweet = self.session.query(models.Tweet).first() + self.assertFalse(tweet.user is None) + self.assertEqual(u"beccaxannxx",tweet.user.name) + self.assertEqual(65624607,tweet.user.id) + self.assertEqual(1,len(tweet.entity_list)) + self.assertEqual(u"BieberEagle", tweet.entity_list[0].user.screen_name) + + def tearDown(self): self.session.close() - + self.engine.dispose() + os.remove(self.tmpfilepath) if __name__ == '__main__': unittest.main() \ No newline at end of file diff -r 9213a63fa34a -r d4b7d6e2633f script/lib/iri_tweet/utils.py --- a/script/lib/iri_tweet/utils.py Mon Aug 08 09:01:40 2011 +0200 +++ b/script/lib/iri_tweet/utils.py Tue Aug 09 12:40:39 2011 +0200 @@ -78,12 +78,12 @@ class ObjectBufferProxy(object): - def __init__(self, klass, args, kwargs, must_flush): + def __init__(self, klass, args, kwargs, must_flush, instance=None): self.klass= klass self.args = args self.kwargs = kwargs self.must_flush = must_flush - self.instance = None + self.instance = instance def persists(self, session): new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else [] @@ -114,6 +114,21 @@ self.__bufferlist.append(new_proxy) return new_proxy + def get(self, klass, **kwargs): + for proxy in self.__bufferlist: + if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass: + continue + found = True + for k,v in kwargs.items(): + if (k not in proxy.kwargs) or v != proxy.kwargs[k]: + found = False + break + if found: + return proxy + + return None + + @@ -145,6 +160,7 @@ self.token_filename = token_filename self.obj_buffer = ObjectsBuffer() + def __get_user(self, user_dict): logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable @@ -153,13 +169,25 @@ if user_id is None and user_name is None: return None - + + user = None if user_id: - user = self.session.query(User).filter(User.id == user_id).first() + user = self.obj_buffer.get(User, id=user_id) else: - user = self.session.query(User).filter(User.screen_name == user_name).first() + user = self.obj_buffer.get(User, screen_name=user_name) + + if user is not None: + return user + + #todo : add methpds to objectbuffer to get buffer user + user_obj = None + if user_id: + user_obj = self.session.query(User).filter(User.id == user_id).first() + else: + user_obj = self.session.query(User).filter(User.screen_name == user_name).first() - if user is not None: + if user_obj is not None: + user = ObjectBufferProxy(User, None, None, False, user_obj) return user user_created_at = user_dict.get("created_at", None) @@ -180,12 +208,10 @@ if "id" not in user_dict: return None - user = User(**user_dict) + user = self.obj_buffer.add_object(User, None, user_dict, True) - self.session.add(user) - self.session.flush() - - return user + return user + def __process_entity(self, ind, ind_type): logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable @@ -196,57 +222,53 @@ "indice_start": ind["indices"][0], "indice_end" : ind["indices"][1], "tweet_id" : self.tweet.id, - "tweet" : self.tweet } def process_hashtags(): text = ind.get("text", ind.get("hashtag", None)) if text is None: - return None - hashtag = self.session.query(Hashtag).filter(Hashtag.text == text).first() + return None + hashtag = self.obj_buffer.get(Hashtag, text=text) + if hashtag is None: + hashtag_obj = self.session.query(Hashtag).filter(Hashtag.text == text).first() + if hashtag_obj is not None: + hashtag = ObjectBufferProxy(Hashtag, None, None, False, hashtag_obj) + if not hashtag: ind["text"] = text - hashtag = Hashtag(**ind) - self.session.add(hashtag) - self.session.flush() - entity_dict['hashtag'] = hashtag + hashtag = self.obj_buffer.add_object(Hashtag, None, ind, True) entity_dict['hashtag_id'] = hashtag.id - entity = EntityHashtag(**entity_dict) - return entity + return EntityHashtag, entity_dict def process_user_mentions(): user_mention = self.__get_user(ind) if user_mention is None: - entity_dict['user'] = None entity_dict['user_id'] = None else: - entity_dict['user'] = user_mention entity_dict['user_id'] = user_mention.id - entity = EntityUser(**entity_dict) - return entity + return EntityUser, entity_dict def process_urls(): - url = self.session.query(Url).filter(Url.url == ind["url"]).first() + url = self.obj_buffer.get(Url, url=ind["url"]) if url is None: - url = Url(**ind) - self.session.add(url) - self.session.flush() - entity_dict['url'] = url + url_obj = self.session.query(Url).filter(Url.url == ind["url"]).first() + if url_obj is not None: + url = ObjectBufferProxy(Url, None, None, False, url_obj) + if url is None: + url = self.obj_buffer.add_object(Url, None, ind, True) entity_dict['url_id'] = url.id - entity = EntityUrl(**entity_dict) - return entity + return EntityUrl, entity_dict #{'': lambda } - entity = { + entity_klass, entity_dict = { 'hashtags': process_hashtags, 'user_mentions' : process_user_mentions, 'urls' : process_urls }[ind_type]() logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable - if entity: - self.session.add(entity) - self.session.flush() + if entity_klass: + self.obj_buffer.add_object(entity_klass, None, entity_dict, False) def __process_twitter_stream(self): @@ -261,17 +283,14 @@ user = self.__get_user(self.json_dict["user"]) if user is None: logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable - ts_copy["user"] = None ts_copy["user_id"] = None else: - ts_copy["user"] = user - ts_copy["user_id"] = ts_copy["user"].id + ts_copy["user_id"] = user.id + del(ts_copy['user']) ts_copy["tweet_source_id"] = self.source_id - self.tweet = Tweet(**ts_copy) - self.session.add(self.tweet) - self.session.flush() + self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True) # get entities for ind_type, entity_list in self.json_dict["entities"].items(): @@ -314,12 +333,10 @@ tweet_fields["user"] = None tweet_fields["user_id"] = None else: - tweet_fields["user"] = user tweet_fields["user_id"] = user.id tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) - self.tweet = Tweet(**tweet_fields) - self.session.add(self.tweet) + self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True) text = self.tweet.text @@ -339,9 +356,7 @@ def process(self): if self.source_id is None: - tweet_source = TweetSource(original_json=self.json_txt); - self.session.add(tweet_source) - self.session.flush() + tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True) self.source_id = tweet_source.id if "metadata" in self.json_dict: @@ -349,9 +364,10 @@ else: self.__process_twitter_stream() - tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK']) - self.session.add(tweet_log) + self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, False) + self.obj_buffer.persists(self.session) + def set_logging(options, plogger=None): diff -r 9213a63fa34a -r d4b7d6e2633f script/lib/tweetstream/setup.py --- a/script/lib/tweetstream/setup.py Mon Aug 08 09:01:40 2011 +0200 +++ b/script/lib/tweetstream/setup.py Tue Aug 09 12:40:39 2011 +0200 @@ -1,3 +1,4 @@ +#@PydevCodeAnalysisIgnore from setuptools import setup, find_packages import sys, os