multiple debugging and corrections
authorYves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Fri, 12 Aug 2011 18:17:27 +0200
changeset 254 2209e66bb50b
parent 253 e9335ee3cf71
child 255 500cd0405c7a
multiple debugging and corrections
.hgignore
script/lib/iri_tweet/models.py
script/lib/iri_tweet/tests.py
script/lib/iri_tweet/utils.py
script/lib/tweetstream/tweetstream/streamclasses.py
script/stream/recorder_tweetstream.py
--- a/.hgignore	Tue Aug 09 13:07:23 2011 +0200
+++ b/.hgignore	Fri Aug 12 18:17:27 2011 +0200
@@ -33,3 +33,6 @@
 
 syntax: regexp
 ^web/CPV/config\.php$
+
+syntax: regexp
+^script/virtualenv/venv2$
\ No newline at end of file
--- a/script/lib/iri_tweet/models.py	Tue Aug 09 13:07:23 2011 +0200
+++ b/script/lib/iri_tweet/models.py	Fri Aug 12 18:17:27 2011 +0200
@@ -27,31 +27,38 @@
     else:
         return anyjson.serialize(obj)
 
+class EntityType(Base):
+    __tablename__ = "tweet_entity_type"
+    id = Column(Integer, primary_key=True, autoincrement=True)
+    label = Column(String)
+
 class Entity(Base):
     __tablename__ = "tweet_entity"
-    id = Column(Integer, primary_key = True)
+    id = Column(Integer, primary_key=True)
     tweet_id = Column(BigInteger, ForeignKey('tweet_tweet.id'))
-    #tweet = relationship(Tweet, primaryjoin = tweet_id == Tweet.id)
     type = Column(String)
+    entity_type_id = Column(Integer, ForeignKey('tweet_entity_type.id'), nullable=False)
+    entity_type = relationship("EntityType", backref="entities")
     indice_start = Column(Integer)
     indice_end = Column(Integer)
-    __mapper_args__ = {'polymorphic_on': type}
+    source = Column(String)
+    __mapper_args__ = {'polymorphic_on': type, 'polymorphic_identity': 'entity_entity', 'with_polymorphic':'*'}
 
     def __init__(self, **kwargs):
         for key, value in kwargs.items():
-            if hasattr(self,key):
-                setattr(self,key,value)
+            if hasattr(self, key):
+                setattr(self, key, value)
 
 class TweetSource(Base):
     __tablename__ = 'tweet_tweet_source'
-    id = Column(Integer, primary_key = True, autoincrement=True)
+    id = Column(Integer, primary_key=True, autoincrement=True)
     original_json = Column(String)
-    received_at = Column(DateTime, default=datetime.datetime.now())
+    received_at = Column(DateTime, default=datetime.datetime.now(), index=True)
     
     def __init__(self, **kwargs):
         for key, value in kwargs.items():
-            if hasattr(self,key):
-                setattr(self,key,value)
+            if hasattr(self, key):
+                setattr(self, key, value)
 
 
 class TweetLog(Base):
@@ -62,7 +69,7 @@
     }
     
     __tablename__ = 'tweet_tweet_log'
-    id = Column(Integer, primary_key = True, autoincrement=True)
+    id = Column(Integer, primary_key=True, autoincrement=True)
     tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
     tweet_source = relationship("TweetSource", backref="logs")
     status = Column(Integer)
@@ -76,17 +83,17 @@
     id = Column(BigInteger, primary_key=True, autoincrement=False)
     id_str = Column(String)
     contributors = Column(String)
-    coordinates =  Column(String) 
+    coordinates = Column(String) 
     created_at = Column(DateTime)
     favorited = Column(Boolean)
     geo = Column(String)
     in_reply_to_screen_name = Column(String)
     in_reply_to_status_id = Column(BigInteger) 
     in_reply_to_status_id_str = Column(String)
-    in_reply_to_user_id = Column(Integer)
+    in_reply_to_user_id = Column(BigInteger)
     in_reply_to_user_id_str = Column(String)
     place = Column(String)
-    retweet_count = Column(Integer)
+    retweet_count = Column(String)
     retweeted = Column(Boolean)
     source = Column(String)
     text = Column(String)
@@ -96,17 +103,17 @@
     tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
     tweet_source = relationship("TweetSource", backref="tweet")
     entity_list = relationship(Entity, backref='tweet')
-    received_at = Column(DateTime, default=datetime.datetime.now())
+    received_at = Column(DateTime, default=datetime.datetime.now(), index=True)
         
     def __init__(self, **kwargs):
         for key, value in kwargs.items():
-            if hasattr(self,key):
-                setattr(self,key,value)
+            if hasattr(self, key):
+                setattr(self, key, value)
 
 class UserMessage(Base):
     __tablename__ = "tweet_user_message"
 
-    id = Column(Integer, primary_key = True)
+    id = Column(Integer, primary_key=True)
     user_id = Column(Integer, ForeignKey('tweet_user.id'))
     user = relationship("User", backref="messages")
     created_at = Column(DateTime, default=datetime.datetime.now())
@@ -115,7 +122,7 @@
 class Message(Base):
     __tablename__ = "tweet_message"
     
-    id = Column(Integer, primary_key = True)
+    id = Column(Integer, primary_key=True)
     created_at = Column(DateTime, default=datetime.datetime.now())
     text = Column(String)
     users = relationship(UserMessage, backref='message')
@@ -124,57 +131,55 @@
 class User(Base):
     __tablename__ = "tweet_user"
     
-    id = Column(Integer, primary_key = True, autoincrement=False)
-    id_str= Column(String)
-    contributors_enabled= Column(Boolean)
-    created_at= Column(DateTime)
-    description= Column(String)
+    id = Column(BigInteger, primary_key=True, autoincrement=False)
+    id_str = Column(String)
+    contributors_enabled = Column(Boolean)
+    created_at = Column(DateTime)
+    description = Column(String)
     favourites_count = Column(Integer)
     follow_request_sent = Column(Boolean)
     followers_count = Column(Integer)
     following = Column(String)
     friends_count = Column(Integer)
-    geo_enabled= Column(Boolean)
-    is_translator= Column(Boolean)
+    geo_enabled = Column(Boolean)
+    is_translator = Column(Boolean)
     lang = Column(String)
     listed_count = Column(Integer)
-    location= Column(String)
+    location = Column(String)
     name = Column(String)
     notifications = Column(String)
-    profile_background_color= Column(String)
-    profile_background_image_url= Column(String)
-    profile_background_tile= Column(Boolean)
-    profile_image_url= Column(String)
-    profile_link_color= Column(String)
-    profile_sidebar_border_color= Column(String)
-    profile_sidebar_fill_color= Column(String)
-    profile_text_color= Column(String)
-    profile_use_background_image= Column(Boolean)
-    protected= Column(Boolean)
-    screen_name= Column(String, index=True, unique=True)
-    show_all_inline_media= Column(Boolean)
+    profile_background_color = Column(String)
+    profile_background_image_url = Column(String)
+    profile_background_tile = Column(Boolean)
+    profile_image_url = Column(String)
+    profile_link_color = Column(String)
+    profile_sidebar_border_color = Column(String)
+    profile_sidebar_fill_color = Column(String)
+    profile_text_color = Column(String)
+    profile_use_background_image = Column(Boolean)
+    protected = Column(Boolean)
+    screen_name = Column(String, index=True, unique=True)
+    show_all_inline_media = Column(Boolean)
     statuses_count = Column(Integer)
-    time_zone= Column(String)
-    url= Column(String)
+    time_zone = Column(String)
+    url = Column(String)
     utc_offset = Column(Integer)
-    verified= Column(Boolean)
+    verified = Column(Boolean)
 
     def __init__(self, **kwargs):
         for key, value in kwargs.items():
-            if hasattr(self,key):
-                setattr(self,key,value)    
+            if hasattr(self, key):
+                setattr(self, key, value)    
     
 
 class Hashtag(Base):
     __tablename__ = "tweet_hashtag"
     id = Column(Integer, primary_key=True)
-    text = Column(String, unique = True, index = True)
+    text = Column(String, unique=True, index=True)
     def __init__(self, **kwargs):
         for key, value in kwargs.items():
-            if hasattr(self,key):
-                setattr(self,key,value)
-
-
+            if hasattr(self, key):
+                setattr(self, key, value)
 
 class Url(Base):
     __tablename__ = "tweet_url"
@@ -183,8 +188,35 @@
     expanded_url = Column(String)
     def __init__(self, **kwargs):
         for key, value in kwargs.items():
-            if hasattr(self,key):
-                setattr(self,key,value)
+            if hasattr(self, key):
+                setattr(self, key, value)
+
+class MediaType(Base):
+    __tablename__ = "tweet_media_type"
+    id = Column(Integer, primary_key=True, autoincrement=True)
+    label = Column(String, unique=True, index=True)
+    def __init__(self, **kwargs):
+        for key, value in kwargs.items():
+            if hasattr(self, key):
+                setattr(self, key, value)
+    
+
+class Media(Base):
+    __tablename__ = "tweet_media"
+    id = Column(BigInteger, primary_key=True, autoincrement=False)
+    id_str = Column(String, unique=True)
+    media_url = Column(String, unique=True)
+    media_url_https = Column(String, unique=True)
+    url = Column(String)
+    display_url = Column(String)
+    expanded_url = Column(String)
+    sizes = Column(String)
+    type_id = Column(Integer, ForeignKey("tweet_media_type.id"))
+    type = relationship(MediaType, primaryjoin=type_id == MediaType.id)
+    def __init__(self, **kwargs):
+        for key, value in kwargs.items():
+            if hasattr(self, key):
+                setattr(self, key, value)
 
     
 
@@ -197,8 +229,8 @@
     def __init__(self, **kwargs):
         super(EntityHashtag, self).__init__(**kwargs)
         for key, value in kwargs.items():
-            if hasattr(self,key):
-                setattr(self,key,value)
+            if hasattr(self, key):
+                setattr(self, key, value)
 
     
 class EntityUrl(Entity):
@@ -210,22 +242,35 @@
     def __init__(self, **kwargs):
         super(EntityUrl, self).__init__(**kwargs)
         for key, value in kwargs.items():
-            if hasattr(self,key):
-                setattr(self,key,value)
+            if hasattr(self, key):
+                setattr(self, key, value)
 
 class EntityUser(Entity):
     __tablename__ = "tweet_entity_user"
     __mapper_args__ = {'polymorphic_identity': 'entity_user'}
     id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
-    user_id = Column(Integer, ForeignKey('tweet_user.id'))
-    user = relationship(User, primaryjoin=user_id == User.id)
+    user_id = Column(BigInteger, ForeignKey('tweet_user.id'))
+    user = relationship(User, primaryjoin=(user_id == User.id))
 
     def __init__(self, **kwargs):
         super(EntityUser, self).__init__(**kwargs)
         for key, value in kwargs.items():
-            if hasattr(self,key):
-                setattr(self,key,value)
+            if hasattr(self, key):
+                setattr(self, key, value)
                 
+class EntityMedia(Entity):
+    __tablename__ = "tweet_entity_media"
+    __mapper_args__ = {'polymorphic_identity': 'entity_media'}
+    id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
+    media_id = Column(BigInteger, ForeignKey('tweet_media.id'))
+    media = relationship(Media, primaryjoin=(media_id == Media.id))
+
+    def __init__(self, **kwargs):
+        super(EntityMedia, self).__init__(**kwargs)
+        for key, value in kwargs.items():
+            if hasattr(self, key):
+                setattr(self, key, value)
+
                 
 def setup_database(*args, **kwargs):
         
@@ -263,15 +308,15 @@
 
 tweet_tweet = {
     'contributors': None,
-    'coordinates': None, 
-    'created_at': 'date', 
-    'entities': "tweet_entity", 
+    'coordinates': None,
+    'created_at': 'date',
+    'entities': "tweet_entity",
     'favorited': "bool",
     'geo': None,
     'id': "long",
     'id_str': "string",
-    'in_reply_to_screen_name': "string", 
-    'in_reply_to_status_id': "long", 
+    'in_reply_to_screen_name': "string",
+    'in_reply_to_status_id': "long",
     'in_reply_to_status_id_str': "string",
     'in_reply_to_user_id': "int",
     'in_reply_to_user_id_str': "string",
@@ -354,6 +399,6 @@
 
 tweet_url = {
     "url": "string",
-    "expanded_url" : "string",    
+    "expanded_url" : "string",
 }
 
--- a/script/lib/iri_tweet/tests.py	Tue Aug 09 13:07:23 2011 +0200
+++ b/script/lib/iri_tweet/tests.py	Fri Aug 12 18:17:27 2011 +0200
@@ -111,7 +111,8 @@
         
 
 original_json = u'{"in_reply_to_user_id_str":null,"text":"RT @BieberEagle: \\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweeted_status":{"in_reply_to_user_id_str":null,"text":"\\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweet_count":"100+","source":"web","entities":{"user_mentions":[],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96638597737889792","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/298443445\/355584171.jpg","listed_count":5040,"friends_count":8477,"profile_link_color":"ff0000","profile_sidebar_border_color":"000000","url":"http:\/\/twitpic.com\/photos\/BieberEagle","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","profile_image_url":"http:\/\/a2.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","description":"1 name, 1 inspiration, 1 hero, 1 smile, 1 singer, 1 boy who changed my life.    B.\u0130.E.B.E.R-Believe In Everything Because Everything\'s Reachable. #NEVERSAYNEVER","default_profile":false,"notifications":null,"time_zone":"Paris","followers_count":14506,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"BieberEagle","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ffffff","location":"\u2665 Albania \u2665 ","id_str":"229067923","profile_background_image_url":"http:\/\/a2.twimg.com\/profile_background_images\/298443445\/355584171.jpg","favourites_count":89,"protected":false,"follow_request_sent":null,"following":null,"name":"truebelieber","statuses_count":24279,"verified":false,"created_at":"Tue Dec 21 12:35:18 +0000 2010","profile_text_color":"000000","id":229067923,"contributors_enabled":false,"utc_offset":3600,"profile_sidebar_fill_color":""},"id":96638597737889792,"created_at":"Thu Jul 28 17:50:11 +0000 2011","geo":null,"in_reply_to_screen_name":null},"retweet_count":"100+","source":"web","entities":{"user_mentions":[{"indices":[3,15],"screen_name":"BieberEagle","id_str":"229067923","name":"truebelieber","id":229067923}],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96965037637382145","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","listed_count":3,"friends_count":1150,"profile_link_color":"00cccc","profile_sidebar_border_color":"c8ff00","url":"http:\/\/www.facebook.com\/blovedbecca180","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","profile_image_url":"http:\/\/a3.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","description":"if ya wanna know something about me, then get to know me. \\n\\r\\n\\ri promise, you wont regret it. (:\\r\\ni love justin bieber with an extreme burning passion!","default_profile":false,"notifications":null,"time_zone":"Central Time (US & Canada)","followers_count":361,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"beccaxannxx","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ff0066","location":"","id_str":"65624607","profile_background_image_url":"http:\/\/a3.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","favourites_count":266,"protected":false,"follow_request_sent":null,"following":null,"name":"beccaxannxx","statuses_count":2512,"verified":false,"created_at":"Fri Aug 14 12:36:35 +0000 2009","profile_text_color":"6a39d4","id":65624607,"contributors_enabled":false,"utc_offset":-21600,"profile_sidebar_fill_color":"ff00bb"},"id":96965037637382145,"created_at":"Fri Jul 29 15:27:21 +0000 2011","geo":null,"in_reply_to_screen_name":null}'
-
+original_json_media = u'{"user": {"follow_request_sent": null, "profile_use_background_image": true, "id": 34311537, "verified": false, "profile_image_url_https": "https://si0.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "profile_sidebar_fill_color": "DAECF4", "is_translator": false, "geo_enabled": false, "profile_text_color": "663B12", "followers_count": 29, "profile_sidebar_border_color": "C6E2EE", "id_str": "34311537", "default_profile_image": false, "location": "", "utc_offset": -25200, "statuses_count": 813, "description": "i like joe jonas, justin bieber, ashley tisdale, selena gomez, megen fox, kim kardashian and demi lovoto and many more.", "friends_count": 101, "profile_link_color": "1F98C7", "profile_image_url": "http://a1.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "notifications": null, "show_all_inline_media": false, "profile_background_image_url_https": "https://si0.twimg.com/profile_background_images/298244445/tour2011.jpg", "profile_background_color": "C6E2EE", "profile_background_image_url": "http://a1.twimg.com/profile_background_images/298244445/tour2011.jpg", "name": "mikayla", "lang": "en", "profile_background_tile": true, "favourites_count": 231, "screen_name": "bieberfever17ya", "url": null, "created_at": "Wed Apr 22 16:04:28 +0000 2009", "contributors_enabled": false, "time_zone": "Mountain Time (US & Canada)", "protected": false, "default_profile": false, "following": null, "listed_count": 1}, "favorited": false, "entities": {"user_mentions": [], "media": [{"media_url_https": "https://p.twimg.com/AWea5Z-CQAAvfvK.jpg", "expanded_url": "http://twitter.com/bieberfever17ya/status/101219827649232896/photo/1", "sizes": {"small": {"h": 240, "w": 201, "resize": "fit"}, "large": {"h": 240, "w": 201, "resize": "fit"}, "medium": {"h": 240, "w": 201, "resize": "fit"}, "thumb": {"h": 150, "w": 150, "resize": "crop"}}, "url": "http://t.co/N7yZ8hS", "display_url": "pic.twitter.com/N7yZ8hS", "id_str": "101219827653427200", "indices": [31, 50], "type": "photo", "id": 101219827653427200, "media_url": "http://p.twimg.com/AWea5Z-CQAAvfvK.jpg"}], "hashtags": [], "urls": []}, "contributors": null, "truncated": false, "text": "i love you justin bieber &lt;3 http://t.co/N7yZ8hS", "created_at": "Wed Aug 10 09:14:22 +0000 2011", "retweeted": false, "in_reply_to_status_id": null, "coordinates": null, "id": 101219827649232896, "source": "web", "in_reply_to_status_id_str": null, "place": null, "in_reply_to_user_id": null, "in_reply_to_screen_name": null, "retweet_count": 0, "geo": null, "in_reply_to_user_id_str": null, "possibly_sensitive": false, "id_str": "101219827649232896"}'
+original_json_media_others = u'{"user": {"utc_offset": -25200, "statuses_count": 813, "default_profile_image": false, "friends_count": 101, "profile_background_image_url_https": "https://si0.twimg.com/profile_background_images/298244445/tour2011.jpg", "profile_use_background_image": true, "profile_sidebar_fill_color": "DAECF4", "profile_link_color": "1F98C7", "profile_image_url": "http://a1.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "time_zone": "Mountain Time (US & Canada)", "is_translator": false, "screen_name": "bieberfever17ya", "url": null, "show_all_inline_media": false, "geo_enabled": false, "profile_background_color": "C6E2EE", "id": 34311537, "profile_background_image_url": "http://a1.twimg.com/profile_background_images/298244445/tour2011.jpg", "description": "i like joe jonas, justin bieber, ashley tisdale, selena gomez, megen fox, kim kardashian and demi lovoto and many more.", "lang": "en", "profile_background_tile": true, "favourites_count": 231, "name": "mikayla", "notifications": null, "follow_request_sent": null, "created_at": "Wed Apr 22 16:04:28 +0000 2009", "verified": false, "contributors_enabled": false, "location": "", "profile_text_color": "663B12", "followers_count": 29, "profile_sidebar_border_color": "C6E2EE", "id_str": "34311537", "default_profile": false, "following": null, "protected": false, "profile_image_url_https": "https://si0.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "listed_count": 1}, "favorited": false, "contributors": null, "source": "web", "text": "i love you justin bieber &lt;3 http://t.co/N7yZ8hS", "created_at": "Wed Aug 10 09:14:22 +0000 2011", "truncated": false, "retweeted": false, "in_reply_to_status_id_str": null, "coordinates": null, "in_reply_to_user_id_str": null, "entities": {"user_mentions": [], "media": [], "hashtags": [], "urls": [], "others": [{"url": "http://t.co/N7yZ8hS", "text": "comments", "indices": [31, 50]}]}, "in_reply_to_status_id": null, "in_reply_to_screen_name": null, "id_str": "101219827649232896", "place": null, "retweet_count": 0, "geo": null, "id": 101219827649232896, "possibly_sensitive": false, "in_reply_to_user_id": null}'
 
 class TestTwitterProcessor(unittest.TestCase):
     
@@ -136,7 +137,46 @@
         self.assertEqual(u"beccaxannxx",tweet.user.name)
         self.assertEqual(65624607,tweet.user.id)
         self.assertEqual(1,len(tweet.entity_list))
-        self.assertEqual(u"BieberEagle", tweet.entity_list[0].user.screen_name)
+        entity = tweet.entity_list[0]
+        self.assertEqual(u"BieberEagle", entity.user.screen_name)
+        self.assertTrue(entity.user.created_at is None)
+        self.assertEqual("entity_user", entity.type)
+        self.assertEqual("user_mentions", entity.entity_type.label)
+        
+
+    def testTwitterProcessorMedia(self):
+        tp = TwitterProcessor(None, original_json_media, None, self.session, self.tmpfilepath)
+        tp.process()
+        self.session.commit()
+        
+        self.assertEquals(1, self.session.query(models.TweetSource).count())
+        self.assertEquals(1, self.session.query(models.Tweet).count())
+        self.assertEquals(1, self.session.query(models.User).count())
+        tweet = self.session.query(models.Tweet).first()
+        self.assertFalse(tweet.user is None)
+        self.assertEqual(u"mikayla",tweet.user.name)
+        self.assertEqual(34311537,tweet.user.id)
+        self.assertEqual(1,len(tweet.entity_list))
+        entity = tweet.entity_list[0]
+        self.assertEqual(101219827653427200, entity.media.id)
+        self.assertEqual("photo", entity.media.type.label)
+        self.assertEqual("entity_media", entity.type)
+        self.assertEqual("media", entity.entity_type.label)
+
+
+    def testTwitterProcessorMediaOthers(self):
+        tp = TwitterProcessor(None, original_json_media_others, None, self.session, self.tmpfilepath)
+        tp.process()
+        self.session.commit()
+        
+        self.assertEquals(1, self.session.query(models.TweetSource).count())
+        self.assertEquals(1, self.session.query(models.Tweet).count())
+        tweet = self.session.query(models.Tweet).first()
+        self.assertEqual(1,len(tweet.entity_list))
+        entity = tweet.entity_list[0]
+        self.assertEqual("entity_entity", entity.type)
+        self.assertEqual("others", entity.entity_type.label)
+
 
 
     def tearDown(self):
--- a/script/lib/iri_tweet/utils.py	Tue Aug 09 13:07:23 2011 +0200
+++ b/script/lib/iri_tweet/utils.py	Fri Aug 12 18:17:27 2011 +0200
@@ -1,6 +1,7 @@
 from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, 
     EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, 
-    ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog)
+    ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType, 
+    Media, EntityMedia, Entity, EntityType)
 from sqlalchemy.sql import select, or_ #@UnresolvedImport
 import anyjson #@UnresolvedImport
 import datetime
@@ -16,24 +17,40 @@
 
 CACHE_ACCESS_TOKEN = {}
 
-def get_oauth_token(token_file_path=None, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET):
+def get_oauth_token(token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET):
     
     global CACHE_ACCESS_TOKEN
-    
-    if CACHE_ACCESS_TOKEN is not None and application_name in CACHE_ACCESS_TOKEN:
-        return CACHE_ACCESS_TOKEN[application_name]
-    
-    if token_file_path and os.path.exists(token_file_path):
-        logging.debug("reading token from file %s" % token_file_path) #@UndefinedVariable
-        CACHE_ACCESS_TOKEN[application_name] = twitter.oauth.read_token_file(token_file_path)
-        return CACHE_ACCESS_TOKEN[application_name]
-        #read access token info from path
-    
-    if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
+
+    if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
         return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
     
-    CACHE_ACCESS_TOKEN[application_name] = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
-    return CACHE_ACCESS_TOKEN[application_name]
+    res = CACHE_ACCESS_TOKEN.get(application_name, None)
+    
+    if res is None and token_file_path and os.path.exists(token_file_path):
+        get_logger().debug("get_oauth_token : reading token from file %s" % token_file_path) #@UndefinedVariable
+        res = twitter.oauth.read_token_file(token_file_path)
+    
+    if res is not None and check_access_token:
+        get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable
+        t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], CONSUMER_KEY, CONSUMER_SECRET))
+        status = None
+        try:
+            status = t.account.rate_limit_status()
+        except Exception as e:
+            get_logger().debug("get_oauth_token : error getting rate limit status %s" % repr(e))
+            status = None
+        get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable
+        if status is None or status['remaining_hits'] == 0:
+            get_logger().debug("get_oauth_token : Problem with status %s" % repr(status))
+            res = None
+
+    if res is None:
+        get_logger().debug("get_oauth_token : doing the oauth dance")
+        res = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
+    
+    CACHE_ACCESS_TOKEN[application_name] = res
+    
+    return res
 
 def parse_date(date_str):
     ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable
@@ -54,6 +71,13 @@
         "user": {
             "created_at"  : adapt_date,
         },
+
+    },
+                  
+    'entities' : {
+        "medias": {
+            "sizes"  : adapt_json,
+        },                  
     },
     'rest': {
         "tweet" : {
@@ -89,7 +113,12 @@
         new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
         new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {}
         
-        self.instance = self.klass(*new_args, **new_kwargs)
+        if self.instance is None:
+            self.instance = self.klass(*new_args, **new_kwargs)
+        else:
+            self.instance = self.klass(*new_args, **new_kwargs)
+            self.instance = session.merge(self.instance)
+
         session.add(self.instance)
         if self.must_flush:
             session.flush()
@@ -104,40 +133,45 @@
 
     def __init__(self):
         self.__bufferlist = []
+        self.__bufferdict = {}
+    
+    def __add_proxy_object(self, proxy):
+        proxy_list =  self.__bufferdict.get(proxy.klass, None)
+        if proxy_list is None:
+            proxy_list = []
+            self.__bufferdict[proxy.klass] = proxy_list
+        proxy_list.append(proxy)
+        self.__bufferlist.append(proxy)
         
     def persists(self, session):
         for object_proxy in self.__bufferlist:
             object_proxy.persists(session)
-            
-    def add_object(self, klass, args, kwargs, must_flush):
-        new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush)
-        self.__bufferlist.append(new_proxy)
+                
+    def add_object(self, klass, args, kwargs, must_flush, instance=None):
+        new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush, instance)
+        self.__add_proxy_object(new_proxy)
         return new_proxy 
     
     def get(self, klass, **kwargs):
-        for proxy in self.__bufferlist:
-            if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass:
-                continue
-            found = True
-            for k,v in kwargs.items():
-                if (k not in proxy.kwargs) or v != proxy.kwargs[k]:
-                    found = False
-                    break
-            if found:
-                return proxy
-        
+        if klass in self.__bufferdict:
+            for proxy in self.__bufferdict[klass]:
+                if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass:
+                    continue
+                found = True
+                for k,v in kwargs.items():
+                    if (k not in proxy.kwargs) or v != proxy.kwargs[k]:
+                        found = False
+                        break
+                if found:
+                    return proxy        
         return None
                 
-                    
-        
-
-
 class TwitterProcessorException(Exception):
     pass
 
 class TwitterProcessor(object):
     
-    def __init__(self, json_dict, json_txt, source_id, session, token_filename=None):
+    def __init__(self, json_dict, json_txt, source_id, session, access_token=None, token_filename=None):
 
         if json_dict is None and json_txt is None:
             raise TwitterProcessorException("No json")
@@ -158,11 +192,13 @@
         self.source_id = source_id
         self.session = session
         self.token_filename = token_filename
+        self.access_token = access_token
         self.obj_buffer = ObjectsBuffer()
+        
 
 
-    def __get_user(self, user_dict):
-        logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
+    def __get_user(self, user_dict, do_merge, query_twitter = False):
+        get_logger().debug("Get user : " + repr(user_dict)) #@UndefinedVariable
     
         user_id = user_dict.get("id",None)    
         user_name = user_dict.get("screen_name", user_dict.get("name", None))
@@ -192,8 +228,12 @@
     
         user_created_at = user_dict.get("created_at", None)
         
-        if user_created_at is None:
-            acess_token_key, access_token_secret = get_oauth_token(self.token_filename)
+        if user_created_at is None and query_twitter:
+            
+            if self.access_token is not None:
+                acess_token_key, access_token_secret = self.access_token
+            else:
+                acess_token_key, access_token_secret = get_oauth_token(self.token_filename)
             t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET))
             try:
                 if user_id:
@@ -201,8 +241,8 @@
                 else:
                     user_dict = t.users.show(screen_name=user_name)            
             except Exception as e:
-                logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
-                logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
+                get_logger().info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
+                get_logger().info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
                 return None
     
         user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
@@ -210,45 +250,79 @@
             return None
         
         #TODO filter get, wrap in proxy
-        user = self.session.query(User).filter(User.id == user_dict["id"]).first()
+        user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first()
         
-        if user is not None:
-            return user
+        if user_obj is not None:
+            if not do_merge:
+                return ObjectBufferProxy(User, None, None, False, user_obj)
         
         user = self.obj_buffer.add_object(User, None, user_dict, True)
         
         return user
 
+    def __get_or_create_object(self, klass, filter_by_kwargs, filter, creation_kwargs, must_flush, do_merge):
+        
+        obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs)
+        if obj_proxy is None:
+            query = self.session.query(klass)
+            if filter is not None:
+                query = query.filter(filter)
+            else:
+                query = query.filter_by(**filter_by_kwargs)
+            obj_instance = query.first()
+            if obj_instance is not None:
+                if not do_merge:
+                    obj_proxy = ObjectBufferProxy(klass, None, None, False, obj_instance)
+                else:
+                    obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush, obj_instance)
+        if obj_proxy is None:
+            obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush)
+        return obj_proxy
+
 
     def __process_entity(self, ind, ind_type):
-        logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
+        get_logger().debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
         
         ind = clean_keys(ind)
         
+        entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False)
+        
         entity_dict = {
-           "indice_start": ind["indices"][0],
-           "indice_end"  : ind["indices"][1],
-           "tweet_id"    : self.tweet.id,
+           "indice_start"   : ind["indices"][0],
+           "indice_end"     : ind["indices"][1],
+           "tweet_id"       : self.tweet.id,
+           "entity_type_id" : entity_type.id,
+           "source"         : adapt_json(ind)
         }
-    
+
+        def process_medias():
+            
+            media_id = ind.get('id', None)
+            if media_id is None:
+                return None, None
+            
+            type_str = ind.get("type", "photo")
+            media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False)
+            media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"])
+            if "type" in media_ind:
+                del(media_ind["type"])
+            media_ind['type_id'] = media_type.id            
+            media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False)
+            
+            entity_dict['media_id'] = media.id
+            return EntityMedia, entity_dict
+
         def process_hashtags():
             text = ind.get("text", ind.get("hashtag", None))
             if text is None:
-                return None
-            hashtag = self.obj_buffer.get(Hashtag, text=text)
-            if hashtag is None: 
-                hashtag_obj = self.session.query(Hashtag).filter(Hashtag.text.ilike(text)).first()
-                if hashtag_obj is not None:
-                    hashtag = ObjectBufferProxy(Hashtag, None, None, False, hashtag_obj)
-                    
-            if hashtag is None:
-                ind["text"] = text
-                hashtag = self.obj_buffer.add_object(Hashtag, None, ind, True)
+                return None, None
+            ind['text'] = text
+            hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False)
             entity_dict['hashtag_id'] = hashtag.id
             return EntityHashtag, entity_dict             
         
         def process_user_mentions():
-            user_mention = self.__get_user(ind)
+            user_mention = self.__get_user(ind, False, False)
             if user_mention is None:
                 entity_dict['user_id'] = None
             else:
@@ -256,24 +330,19 @@
             return EntityUser, entity_dict
         
         def process_urls():
-            url = self.obj_buffer.get(Url, url=ind["url"])
-            if url is None:
-                url_obj = self.session.query(Url).filter(Url.url == ind["url"]).first()
-                if url_obj is not None:
-                    url = ObjectBufferProxy(Url, None, None, False, url_obj)
-            if url is None:
-                url = self.obj_buffer.add_object(Url, None, ind, True)
+            url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False)
             entity_dict['url_id'] = url.id
             return EntityUrl, entity_dict
-        
+                
         #{'': lambda }
         entity_klass, entity_dict =  { 
             'hashtags': process_hashtags,
             'user_mentions' : process_user_mentions,
-            'urls' : process_urls
-            }[ind_type]()
+            'urls' : process_urls,
+            'media': process_medias,
+            }.get(ind_type, lambda: (Entity, entity_dict))()
             
-        logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
+        get_logger().debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
         if entity_klass:
             self.obj_buffer.add_object(entity_klass, None, entity_dict, False)
 
@@ -287,9 +356,9 @@
         ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
         
         # get or create user
-        user = self.__get_user(self.json_dict["user"])
+        user = self.__get_user(self.json_dict["user"], True)
         if user is None:
-            logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
+            get_logger().warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
             ts_copy["user_id"] = None
         else:
             ts_copy["user_id"] = user.id
@@ -299,25 +368,26 @@
         
         self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True)
             
-        # get entities
+        self.__process_entities()
+
+
+    def __process_entities(self):
         if "entities" in self.json_dict:
             for ind_type, entity_list in self.json_dict["entities"].items():
                 for ind in entity_list:
                     self.__process_entity(ind, ind_type)
         else:
-            extractor = twitter_text.Extractor(self.tweet.text)
-    
+            
+            text = self.tweet.text
+            extractor = twitter_text.Extractor(text)
             for ind in extractor.extract_hashtags_with_indices():
                 self.__process_entity(ind, "hashtags")
-    
+            
+            for ind in extractor.extract_urls_with_indices():
+                self.__process_entity(ind, "urls")
+            
             for ind in extractor.extract_mentioned_screen_names_with_indices():
                 self.__process_entity(ind, "user_mentions")
-    
-            for ind in extractor.extract_urls_with_indices():
-                self.__process_entity(ind, "urls")
-
-        self.session.flush()
-
 
     def __process_twitter_rest(self):
         tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
@@ -348,28 +418,17 @@
             'screen_name' : self.json_dict["from_user"],                   
         }
         
-        user = self.__get_user(user_fields)
+        user = self.__get_user(user_fields, do_merge=False)
         if user is None:
-            logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
+            get_logger().warning("USER not found " + repr(user_fields)) #@UndefinedVariable
             tweet_fields["user_id"] = None
         else:
             tweet_fields["user_id"] = user.id
         
         tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
         self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True)
-        
-        text = self.tweet.text
-        
-        extractor = twitter_text.Extractor(text)
-        
-        for ind in extractor.extract_hashtags_with_indices():
-            self.__process_entity(ind, "hashtags")
-                    
-        for ind in extractor.extract_urls_with_indices():
-            self.__process_entity(ind, "urls")
-        
-        for ind in extractor.extract_mentioned_screen_names_with_indices():
-            self.__process_entity(ind, "user_mentions")
+                
+        self.__process_entities()
 
 
 
@@ -384,7 +443,7 @@
         else:
             self.__process_twitter_stream()
 
-        self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, False)
+        self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, True)
         
         self.obj_buffer.persists(self.session)
 
@@ -405,7 +464,7 @@
             
     logger = plogger
     if logger is None:
-        logger = logging.getLogger() #@UndefinedVariable
+        logger = get_logger() #@UndefinedVariable
     
     if len(logger.handlers) == 0:    
         filename = logging_config.get("filename")
@@ -477,4 +536,5 @@
     
     return query.distinct()
 
-logger = logging.getLogger() #@UndefinedVariable
+def get_logger():
+    return logging.getLogger("iri_tweet") #@UndefinedVariable
--- a/script/lib/tweetstream/tweetstream/streamclasses.py	Tue Aug 09 13:07:23 2011 +0200
+++ b/script/lib/tweetstream/tweetstream/streamclasses.py	Fri Aug 12 18:17:27 2011 +0200
@@ -2,8 +2,8 @@
 import anyjson
 import socket
 import time
-import urllib
-import urllib2
+import urllib #@UnresolvedImport
+import urllib2 #@UnresolvedImport
 socket._fileobject.default_bufsize = 0
 
 
--- a/script/stream/recorder_tweetstream.py	Tue Aug 09 13:07:23 2011 +0200
+++ b/script/stream/recorder_tweetstream.py	Fri Aug 12 18:17:27 2011 +0200
@@ -3,22 +3,25 @@
 from iri_tweet.models import TweetSource, TweetLog
 from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
 from optparse import OptionParser
-from sqlalchemy.orm import sessionmaker
+from sqlalchemy.exc import OperationalError
+from sqlalchemy.orm import scoped_session, sessionmaker
 import StringIO
-import logging
 import anyjson
 import datetime
+import logging
 import os
+import re
 import shutil
 import signal
 import socket
+import sqlalchemy.schema
 import sys
 import time
 import traceback
 import tweepy.auth
 import tweetstream
-from iri_tweet.utils import logger
-from sqlalchemy.exc import OperationalError
+import urllib2
+#from iri_tweet.utils import get_logger
 socket._fileobject.default_bufsize = 0
 
 
@@ -30,6 +33,26 @@
 #just put it in a sqlite3 tqble
 
 
+def set_logging(options):
+    utils.set_logging(options, logging.getLogger('iri_tweet'))
+    utils.set_logging(options, logging.getLogger('multiprocessing'))
+    if options.debug >= 2:
+        utils.set_logging(options, logging.getLogger('sqlalchemy.engine'))
+    #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
+    #utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
+    #utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
+
+def get_auth(options, access_token):
+    if options.username and options.password:
+        auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
+    else:
+        consumer_key = models.CONSUMER_KEY
+        consumer_secret = models.CONSUMER_SECRET
+        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
+        auth.set_access_token(*access_token)
+    return auth
+
+
 class ReconnectingTweetStream(tweetstream.FilterStream):
     """TweetStream class that automatically tries to reconnect if the
     connecting goes down. Reconnecting, and waiting for reconnecting, is
@@ -62,9 +85,10 @@
     def next(self):
         while True:
             try:
+                utils.get_logger().debug("return super.next")
                 return super(ReconnectingTweetStream, self).next()
             except tweetstream.ConnectionError, e:
-                logging.debug("connection error :" + str(e))
+                utils.get_logger().debug("connection error :" + str(e))
                 self._reconnects += 1
                 if self._reconnects > self.max_reconnects:
                     raise tweetstream.ConnectionError("Too many retries")
@@ -80,38 +104,43 @@
 
 
 
+
 class SourceProcess(Process):
     
-    def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
+    def __init__(self, session_maker, queue, options, access_token, stop_event):
         self.session_maker = session_maker
         self.queue = queue
-        self.auth = auth
-        self.track = track
-        self.debug = debug
-        self.reconnects = reconnects
-        self.token_filename = token_filename
+        self.track = options.track
+        self.reconnects = options.reconnects
+        self.token_filename = options.token_filename
         self.stop_event = stop_event
+        self.options = options
+        self.access_token = access_token
         super(SourceProcess, self).__init__()
-#        self.stop_event = 
     
     def run(self):
+        #import pydevd
+        #pydevd.settrace(suspend=False)
+
+        set_logging(self.options)
+        self.auth = get_auth(self.options, self.access_token) 
         
-        get_logger().debug("SourceProcess : run")
+        utils.get_logger().debug("SourceProcess : run")
         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
         track_list = [k for k in track_list.split(',')]
 
-        get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))                        
+        utils.get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))                        
         stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
-        get_logger().debug("SourceProcess : after connecting to stream")
+        utils.get_logger().debug("SourceProcess : after connecting to stream")
         stream.muststop = lambda: self.stop_event.is_set()
         
         session = self.session_maker()
         
         try:
             for tweet in stream:
-                get_logger().debug("tweet " + repr(tweet))
+                utils.get_logger().debug("tweet " + repr(tweet))
                 source = TweetSource(original_json=tweet)
-                get_logger().debug("source created")
+                utils.get_logger().debug("source created")
                 add_retries = 0
                 while add_retries < 10:
                     try:
@@ -121,21 +150,18 @@
                         break
                     except OperationalError as e:
                         session.rollback()
-                        get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
-                        if add_retries==10:
+                        utils.get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
+                        if add_retries == 10:
                             raise e
                      
                 source_id = source.id
-                get_logger().debug("before queue + source id " + repr(source_id))
-                self.queue.put((source_id, tweet), False)
-                #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
-                get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+                utils.get_logger().debug("before queue + source id " + repr(source_id))
+                utils.get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
                 session.commit()
-#                if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
-#                    print "Stop recording after %d seconds." % (duration)
-#                    break
+                self.queue.put((source_id, tweet), False)
+
         except Exception as e:
-            get_logger().error("Error when processing tweet " + repr(e))
+            utils.get_logger().error("Error when processing tweet " + repr(e))
         finally:
             session.rollback()
             stream.close()
@@ -144,19 +170,19 @@
             self.stop_event.set()
 
 
-def process_tweet(tweet, source_id, session, token_filename):
+def process_tweet(tweet, source_id, session, access_token):
     try:
         tweet_obj = anyjson.deserialize(tweet)
         screen_name = ""
         if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
             screen_name = tweet_obj['user']['screen_name']
-        get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
-        get_logger().debug(u"Process_tweet :" + repr(tweet))
-        processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename)
+        utils.get_logger().debug(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
+        utils.get_logger().debug(u"Process_tweet :" + repr(tweet))
+        processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None)
         processor.process()
     except Exception as e:
         message = u"Error %s processing tweet %s" % (repr(e), tweet)
-        get_logger().error(message)
+        utils.get_logger().error(message)
         output = StringIO.StringIO()
         traceback.print_exception(Exception, e, None, None, output)
         error_stack = output.getvalue()
@@ -170,42 +196,49 @@
         
 class TweetProcess(Process):
     
-    def __init__(self, session_maker, queue, debug, token_filename, stop_event):
+    def __init__(self, session_maker, queue, options, access_token, stop_event):
         self.session_maker = session_maker
         self.queue = queue
-        self.debug = debug
         self.stop_event = stop_event
-        self.token_filename = token_filename
+        self.options = options
+        self.access_token = access_token
         super(TweetProcess, self).__init__()
 
 
     def run(self):
         
+        set_logging(self.options)
         session = self.session_maker()
         try:
             while not self.stop_event.is_set():
                 try:
-                    source_id, tweet_txt = queue.get(True, 10)
-                    get_logger().debug("Processing source id " + repr(source_id))
+                    source_id, tweet_txt = queue.get(True, 3)
+                    utils.get_logger().debug("Processing source id " + repr(source_id))
                 except Exception as e:
-                    get_logger().debug('Process tweet exception in loop : ' + repr(e))
+                    utils.get_logger().debug('Process tweet exception in loop : ' + repr(e))
                     continue
-                process_tweet(tweet_txt, source_id, session, self.token_filename)
+                process_tweet(tweet_txt, source_id, session, self.access_token)
                 session.commit()
-        except:
-            raise
         finally:
             session.rollback()
             self.stop_event.set()
             session.close()
+
+
+def get_sessionmaker(conn_str):
+    engine, metadata = models.setup_database(conn_str, echo=False, create_all=False)
+    Session = scoped_session(sessionmaker(bind=engine, autocommit=False))
+    return Session, engine, metadata
+
             
-def process_leftovers(session, token_filename):
+def process_leftovers(session, access_token):
     
     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
     
     for src in sources:
         tweet_txt = src.original_json
-        process_tweet(tweet_txt, src.id, session, token_filename)
+        process_tweet(tweet_txt, src.id, session, access_token)
+        session.commit()
 
         
     
@@ -215,8 +248,8 @@
         
 def get_options():
     parser = OptionParser()
-    parser.add_option("-f", "--file", dest="filename",
-                      help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
+    parser.add_option("-f", "--file", dest="conn_str",
+                      help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
     parser.add_option("-u", "--user", dest="username",
                       help="Twitter user", metavar="USER", default=None)
     parser.add_option("-w", "--password", dest="password",
@@ -231,8 +264,8 @@
                       help="Token file name")
     parser.add_option("-d", "--duration", dest="duration",
                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
-    parser.add_option("-N", "--consumer", dest="consumer_nb",
-                      help="number of consumer", metavar="CONSUMER", default=1, type='int')
+    parser.add_option("-N", "--nb-process", dest="process_nb",
+                      help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
 
 
 
@@ -245,53 +278,81 @@
     
     (options, args) = get_options()
     
-    utils.set_logging(options, get_logger())
+    set_logging(options)
         
     if options.debug:
         print "OPTIONS : "
         print repr(options)
     
-    if options.new and os.path.exists(options.filename):
-        i = 1
-        basename, extension = os.path.splitext(options.filename)
-        new_path = '%s.%d%s' % (basename, i, extension)
-        while i < 1000000 and os.path.exists(new_path):
-            i += 1
+    
+    conn_str = options.conn_str.strip()
+    if not re.match("^\w+://.+", conn_str):
+        conn_str = 'sqlite://' + options.conn_str
+        
+    if conn_str.startswith("sqlite") and options.new:
+        filepath = conn_str[conn_str.find("://"):]
+        if os.path.exists(filepath):
+            i = 1
+            basename, extension = os.path.splitext(filepath)
             new_path = '%s.%d%s' % (basename, i, extension)
-        if i >= 1000000:
-            raise Exception("Unable to find new filename for " + options.filename)
-        else:
-            shutil.move(options.filename, new_path)
+            while i < 1000000 and os.path.exists(new_path):
+                i += 1
+                new_path = '%s.%d%s' % (basename, i, extension)
+            if i >= 1000000:
+                raise Exception("Unable to find new filename for " + filepath)
+            else:
+                shutil.move(filepath, new_path)
 
+    Session, engine, metadata = get_sessionmaker(conn_str)
     
+    if options.new:
+        check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
+        if len(check_metadata.sorted_tables) > 0:
+            message = "Database %s not empty exiting" % conn_str
+            utils.get_logger().error(message)
+            sys.exit(message)
+    
+    metadata.create_all(engine)
+    
+    access_token = None
+    if not options.username or not options.password:
+        access_token = utils.get_oauth_token(options.token_filename)
+    
+    session = Session()
+    try:
+        process_leftovers(session, access_token)
+        session.commit()
+    finally:
+        session.rollback()
+        session.close()
+    
+    if options.process_nb <= 0:
+        utils.get_logger().debug("Leftovers processed. Exiting.")
+        sys.exit()
+
     queue = JoinableQueue()
     stop_event = Event()
-
-    if options.username and options.password:
-        auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
-    else:
-        consumer_key = models.CONSUMER_KEY
-        consumer_secret = models.CONSUMER_SECRET
-        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
-        auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))
-
-
-    engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
-    Session = sessionmaker(bind=engine)
     
-    session = Session()
-    process_leftovers(session, options.token_filename)
-    session.commit()
-    session.close()
-         
-    sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)    
+    #workaround for bug on using urllib2 and multiprocessing
+    req = urllib2.Request('http://localhost')
+    conn = None
+    try:
+        conn = urllib2.urlopen(req)
+    except:
+        pass
+        #donothing
+    finally:
+        if conn is not None:
+            conn.close()
+        
+    
+    sprocess = SourceProcess(Session, queue, options, access_token, stop_event)    
     
     tweet_processes = []
     
-    for i in range(options.consumer_nb):
-        engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
-        Session = sessionmaker(bind=engine)
-        cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
+    for i in range(options.process_nb - 1):
+        Session, engine, metadata = get_sessionmaker(conn_str)
+        cprocess = TweetProcess(Session, queue, options, access_token, stop_event)
         tweet_processes.append(cprocess)
 
     def interupt_handler(signum, frame):
@@ -311,23 +372,36 @@
             stop_event.set()
             break
         if sprocess.is_alive():
-            time.sleep(0.1)
+            time.sleep(1)
         else:
+            stop_event.set()
             break
     
-    get_logger().debug("Joining Source Process")
-    sprocess.join()
-    get_logger().debug("Joining Queue")
+    utils.get_logger().debug("Joining Source Process")
+    try:
+        sprocess.join(10)
+    except:
+        utils.get_logger().debug("Pb joining Source Process - terminating")
+        sprocess.terminate()
+        
+    utils.get_logger().debug("Joining Queue")
     #queue.join()
-    for i,cprocess in enumerate(tweet_processes):
-        get_logger().debug("Joining consumer process Nb %d" % (i+1))
-        cprocess.join()
+    for i, cprocess in enumerate(tweet_processes):
+        utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
+        try:
+            cprocess.join(3)
+        except:
+            utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
+            cprocess.terminate()
     
-    get_logger().debug("Processing leftovers")
+    utils.get_logger().debug("Processing leftovers")
     session = Session()
-    process_leftovers(session, options.token_filename)
-    session.commit()
-    session.close()
+    try:
+        process_leftovers(session, access_token)
+        session.commit()
+    finally:
+        session.rollback()
+        session.close()
 
-    get_logger().debug("Done. Exiting.")
-        
\ No newline at end of file
+    utils.get_logger().debug("Done. Exiting.")
+