migrate twitter processor to use object buffer and add tests
authorYves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Tue, 09 Aug 2011 12:40:39 +0200
changeset 244 d4b7d6e2633f
parent 243 9213a63fa34a
child 245 4c953ca2aa1d
migrate twitter processor to use object buffer and add tests
script/lib/iri_tweet/tests.py
script/lib/iri_tweet/utils.py
script/lib/tweetstream/setup.py
--- a/script/lib/iri_tweet/tests.py	Mon Aug 08 09:01:40 2011 +0200
+++ b/script/lib/iri_tweet/tests.py	Tue Aug 09 12:40:39 2011 +0200
@@ -3,7 +3,10 @@
 from sqlalchemy.orm import relationship, backref
 import unittest #@UnresolvedImport
 from sqlalchemy.orm import sessionmaker
-from iri_tweet.utils import ObjectsBuffer
+from iri_tweet.utils import ObjectsBuffer, TwitterProcessor
+from iri_tweet import models
+import tempfile #@UnresolvedImport
+import os
 
 Base = declarative_base()
 
@@ -48,6 +51,11 @@
         Base.metadata.create_all(self.engine)
         sessionMaker = sessionmaker(bind=self.engine)
         self.session = sessionMaker()
+
+    def tearDown(self):
+        self.session.close()
+        self.engine.dispose()
+
         
     def testCreateUser(self):
         ed_user = User('ed', 'Ed Jones', 'edspassword')
@@ -55,6 +63,7 @@
         self.assertTrue(ed_user.id is None)
         self.session.commit()
         self.assertTrue(ed_user.id is not None)
+
         
     def testSimpleBuffer(self):
         obj_buffer = ObjectsBuffer()
@@ -64,6 +73,17 @@
         self.assertTrue(obj_proxy.id() is None)
         self.session.commit()
         self.assertTrue(obj_proxy.id() is not None)
+
+
+    def testSimpleBufferKwargs(self):
+        obj_buffer = ObjectsBuffer()
+        obj_proxy = obj_buffer.add_object(User, None, {'name':'ed1b', 'fullname':'Ed1b Jones', 'password':'edspassword'}, False)
+        self.assertTrue(obj_proxy.id() is None)
+        obj_buffer.persists(self.session)
+        self.assertTrue(obj_proxy.id() is None)
+        self.session.commit()
+        self.assertTrue(obj_proxy.id() is not None)
+
         
     def testSimpleBufferFlush(self):
         obj_buffer = ObjectsBuffer()
@@ -89,9 +109,40 @@
         self.assertEquals(1, len(ed_user.addresses))
 
         
+
+original_json = u'{"in_reply_to_user_id_str":null,"text":"RT @BieberEagle: \\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweeted_status":{"in_reply_to_user_id_str":null,"text":"\\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweet_count":"100+","source":"web","entities":{"user_mentions":[],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96638597737889792","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/298443445\/355584171.jpg","listed_count":5040,"friends_count":8477,"profile_link_color":"ff0000","profile_sidebar_border_color":"000000","url":"http:\/\/twitpic.com\/photos\/BieberEagle","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","profile_image_url":"http:\/\/a2.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","description":"1 name, 1 inspiration, 1 hero, 1 smile, 1 singer, 1 boy who changed my life.    B.\u0130.E.B.E.R-Believe In Everything Because Everything\'s Reachable. #NEVERSAYNEVER","default_profile":false,"notifications":null,"time_zone":"Paris","followers_count":14506,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"BieberEagle","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ffffff","location":"\u2665 Albania \u2665 ","id_str":"229067923","profile_background_image_url":"http:\/\/a2.twimg.com\/profile_background_images\/298443445\/355584171.jpg","favourites_count":89,"protected":false,"follow_request_sent":null,"following":null,"name":"truebelieber","statuses_count":24279,"verified":false,"created_at":"Tue Dec 21 12:35:18 +0000 2010","profile_text_color":"000000","id":229067923,"contributors_enabled":false,"utc_offset":3600,"profile_sidebar_fill_color":""},"id":96638597737889792,"created_at":"Thu Jul 28 17:50:11 +0000 2011","geo":null,"in_reply_to_screen_name":null},"retweet_count":"100+","source":"web","entities":{"user_mentions":[{"indices":[3,15],"screen_name":"BieberEagle","id_str":"229067923","name":"truebelieber","id":229067923}],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96965037637382145","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","listed_count":3,"friends_count":1150,"profile_link_color":"00cccc","profile_sidebar_border_color":"c8ff00","url":"http:\/\/www.facebook.com\/blovedbecca180","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","profile_image_url":"http:\/\/a3.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","description":"if ya wanna know something about me, then get to know me. \\n\\r\\n\\ri promise, you wont regret it. (:\\r\\ni love justin bieber with an extreme burning passion!","default_profile":false,"notifications":null,"time_zone":"Central Time (US & Canada)","followers_count":361,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"beccaxannxx","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ff0066","location":"","id_str":"65624607","profile_background_image_url":"http:\/\/a3.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","favourites_count":266,"protected":false,"follow_request_sent":null,"following":null,"name":"beccaxannxx","statuses_count":2512,"verified":false,"created_at":"Fri Aug 14 12:36:35 +0000 2009","profile_text_color":"6a39d4","id":65624607,"contributors_enabled":false,"utc_offset":-21600,"profile_sidebar_fill_color":"ff00bb"},"id":96965037637382145,"created_at":"Fri Jul 29 15:27:21 +0000 2011","geo":null,"in_reply_to_screen_name":null}'
+
+
+class TestTwitterProcessor(unittest.TestCase):
+    
+    def setUp(self):
+        self.engine, self.metadata = models.setup_database('sqlite:///:memory:', echo=True)
+        sessionMaker = sessionmaker(bind=self.engine)
+        self.session = sessionMaker()
+        file, self.tmpfilepath = tempfile.mkstemp()
+        os.close(file)
+
+
+    def testTwitterProcessor(self):
+        tp = TwitterProcessor(None, original_json, None, self.session, self.tmpfilepath)
+        tp.process()
+        self.session.commit()
+        
+        self.assertEquals(1, self.session.query(models.TweetSource).count())
+        self.assertEquals(1, self.session.query(models.Tweet).count())
+        self.assertEquals(2, self.session.query(models.User).count())
+        tweet = self.session.query(models.Tweet).first()
+        self.assertFalse(tweet.user is None)
+        self.assertEqual(u"beccaxannxx",tweet.user.name)
+        self.assertEqual(65624607,tweet.user.id)
+        self.assertEqual(1,len(tweet.entity_list))
+        self.assertEqual(u"BieberEagle", tweet.entity_list[0].user.screen_name)
+
+
     def tearDown(self):
         self.session.close()
-
+        self.engine.dispose()
+        os.remove(self.tmpfilepath)
 
 if __name__ == '__main__':
     unittest.main()
\ No newline at end of file
--- a/script/lib/iri_tweet/utils.py	Mon Aug 08 09:01:40 2011 +0200
+++ b/script/lib/iri_tweet/utils.py	Tue Aug 09 12:40:39 2011 +0200
@@ -78,12 +78,12 @@
 
 
 class ObjectBufferProxy(object):
-    def __init__(self, klass, args, kwargs, must_flush):
+    def __init__(self, klass, args, kwargs, must_flush, instance=None):
         self.klass= klass
         self.args = args
         self.kwargs = kwargs
         self.must_flush = must_flush
-        self.instance = None
+        self.instance = instance
         
     def persists(self, session):
         new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
@@ -114,6 +114,21 @@
         self.__bufferlist.append(new_proxy)
         return new_proxy 
     
+    def get(self, klass, **kwargs):
+        for proxy in self.__bufferlist:
+            if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass:
+                continue
+            found = True
+            for k,v in kwargs.items():
+                if (k not in proxy.kwargs) or v != proxy.kwargs[k]:
+                    found = False
+                    break
+            if found:
+                return proxy
+        
+        return None
+                
+                    
         
 
 
@@ -145,6 +160,7 @@
         self.token_filename = token_filename
         self.obj_buffer = ObjectsBuffer()
 
+
     def __get_user(self, user_dict):
         logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
     
@@ -153,13 +169,25 @@
         
         if user_id is None and user_name is None:
             return None
-    
+
+        user = None
         if user_id:
-            user = self.session.query(User).filter(User.id == user_id).first()
+            user = self.obj_buffer.get(User, id=user_id)
         else:
-            user = self.session.query(User).filter(User.screen_name == user_name).first()
+            user = self.obj_buffer.get(User, screen_name=user_name)
+            
+        if user is not None:
+            return user
+
+        #todo : add methpds to objectbuffer to get buffer user
+        user_obj = None
+        if user_id:
+            user_obj = self.session.query(User).filter(User.id == user_id).first()
+        else:
+            user_obj = self.session.query(User).filter(User.screen_name == user_name).first()
     
-        if user is not None:
+        if user_obj is not None:
+            user = ObjectBufferProxy(User, None, None, False, user_obj)
             return user
     
         user_created_at = user_dict.get("created_at", None)
@@ -180,12 +208,10 @@
         if "id" not in user_dict:
             return None
         
-        user = User(**user_dict)
+        user = self.obj_buffer.add_object(User, None, user_dict, True)
         
-        self.session.add(user)
-        self.session.flush()
-        
-        return user 
+        return user
+
 
     def __process_entity(self, ind, ind_type):
         logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
@@ -196,57 +222,53 @@
            "indice_start": ind["indices"][0],
            "indice_end"  : ind["indices"][1],
            "tweet_id"    : self.tweet.id,
-           "tweet"       : self.tweet
         }
     
         def process_hashtags():
             text = ind.get("text", ind.get("hashtag", None))
             if text is None:
-                return None 
-            hashtag = self.session.query(Hashtag).filter(Hashtag.text == text).first()
+                return None
+            hashtag = self.obj_buffer.get(Hashtag, text=text)
+            if hashtag is None: 
+                hashtag_obj = self.session.query(Hashtag).filter(Hashtag.text == text).first()
+                if hashtag_obj is not None:
+                    hashtag = ObjectBufferProxy(Hashtag, None, None, False, hashtag_obj)
+                    
             if not hashtag:
                 ind["text"] = text
-                hashtag = Hashtag(**ind)
-                self.session.add(hashtag)
-                self.session.flush()
-            entity_dict['hashtag'] = hashtag
+                hashtag = self.obj_buffer.add_object(Hashtag, None, ind, True)
             entity_dict['hashtag_id'] = hashtag.id
-            entity = EntityHashtag(**entity_dict)
-            return entity
+            return EntityHashtag, entity_dict             
         
         def process_user_mentions():
             user_mention = self.__get_user(ind)
             if user_mention is None:
-                entity_dict['user'] = None
                 entity_dict['user_id'] = None
             else:
-                entity_dict['user'] = user_mention
                 entity_dict['user_id'] = user_mention.id
-            entity = EntityUser(**entity_dict)
-            return entity
+            return EntityUser, entity_dict
         
         def process_urls():
-            url = self.session.query(Url).filter(Url.url == ind["url"]).first()
+            url = self.obj_buffer.get(Url, url=ind["url"])
             if url is None:
-                url = Url(**ind)
-                self.session.add(url)
-                self.session.flush()
-            entity_dict['url'] = url
+                url_obj = self.session.query(Url).filter(Url.url == ind["url"]).first()
+                if url_obj is not None:
+                    url = ObjectBufferProxy(Url, None, None, False, url_obj)
+            if url is None:
+                url = self.obj_buffer.add_object(Url, None, ind, True)
             entity_dict['url_id'] = url.id
-            entity = EntityUrl(**entity_dict)
-            return entity
+            return EntityUrl, entity_dict
         
         #{'': lambda }
-        entity =  { 
+        entity_klass, entity_dict =  { 
             'hashtags': process_hashtags,
             'user_mentions' : process_user_mentions,
             'urls' : process_urls
             }[ind_type]()
             
         logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
-        if entity:
-            self.session.add(entity)
-            self.session.flush()
+        if entity_klass:
+            self.obj_buffer.add_object(entity_klass, None, entity_dict, False)
 
 
     def __process_twitter_stream(self):
@@ -261,17 +283,14 @@
         user = self.__get_user(self.json_dict["user"])
         if user is None:
             logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
-            ts_copy["user"] = None
             ts_copy["user_id"] = None
         else:
-            ts_copy["user"] = user
-            ts_copy["user_id"] = ts_copy["user"].id
+            ts_copy["user_id"] = user.id
             
+        del(ts_copy['user'])
         ts_copy["tweet_source_id"] = self.source_id
         
-        self.tweet = Tweet(**ts_copy)
-        self.session.add(self.tweet)
-        self.session.flush()
+        self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True)
             
         # get entities
         for ind_type, entity_list in self.json_dict["entities"].items():
@@ -314,12 +333,10 @@
             tweet_fields["user"] = None
             tweet_fields["user_id"] = None
         else:
-            tweet_fields["user"] = user
             tweet_fields["user_id"] = user.id
         
         tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
-        self.tweet = Tweet(**tweet_fields)
-        self.session.add(self.tweet)
+        self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True)
         
         text = self.tweet.text
         
@@ -339,9 +356,7 @@
     def process(self):
         
         if self.source_id is None:
-            tweet_source = TweetSource(original_json=self.json_txt);
-            self.session.add(tweet_source)
-            self.session.flush()
+            tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True)
             self.source_id = tweet_source.id
         
         if "metadata" in self.json_dict:
@@ -349,9 +364,10 @@
         else:
             self.__process_twitter_stream()
 
-        tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK'])            
-        self.session.add(tweet_log)
+        self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, False)
         
+        self.obj_buffer.persists(self.session)
+
 
 def set_logging(options, plogger=None):
     
--- a/script/lib/tweetstream/setup.py	Mon Aug 08 09:01:40 2011 +0200
+++ b/script/lib/tweetstream/setup.py	Tue Aug 09 12:40:39 2011 +0200
@@ -1,3 +1,4 @@
+#@PydevCodeAnalysisIgnore
 from setuptools import setup, find_packages
 import sys, os