# HG changeset patch # User Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com> # Date 1313165847 -7200 # Node ID 2209e66bb50bc6d937843bc2729774d68b486851 # Parent e9335ee3cf7177bbc11f0685e4833a5fad227905 multiple debugging and corrections diff -r e9335ee3cf71 -r 2209e66bb50b .hgignore --- a/.hgignore Tue Aug 09 13:07:23 2011 +0200 +++ b/.hgignore Fri Aug 12 18:17:27 2011 +0200 @@ -33,3 +33,6 @@ syntax: regexp ^web/CPV/config\.php$ + +syntax: regexp +^script/virtualenv/venv2$ \ No newline at end of file diff -r e9335ee3cf71 -r 2209e66bb50b script/lib/iri_tweet/models.py --- a/script/lib/iri_tweet/models.py Tue Aug 09 13:07:23 2011 +0200 +++ b/script/lib/iri_tweet/models.py Fri Aug 12 18:17:27 2011 +0200 @@ -27,31 +27,38 @@ else: return anyjson.serialize(obj) +class EntityType(Base): + __tablename__ = "tweet_entity_type" + id = Column(Integer, primary_key=True, autoincrement=True) + label = Column(String) + class Entity(Base): __tablename__ = "tweet_entity" - id = Column(Integer, primary_key = True) + id = Column(Integer, primary_key=True) tweet_id = Column(BigInteger, ForeignKey('tweet_tweet.id')) - #tweet = relationship(Tweet, primaryjoin = tweet_id == Tweet.id) type = Column(String) + entity_type_id = Column(Integer, ForeignKey('tweet_entity_type.id'), nullable=False) + entity_type = relationship("EntityType", backref="entities") indice_start = Column(Integer) indice_end = Column(Integer) - __mapper_args__ = {'polymorphic_on': type} + source = Column(String) + __mapper_args__ = {'polymorphic_on': type, 'polymorphic_identity': 'entity_entity', 'with_polymorphic':'*'} def __init__(self, **kwargs): for key, value in kwargs.items(): - if hasattr(self,key): - setattr(self,key,value) + if hasattr(self, key): + setattr(self, key, value) class TweetSource(Base): __tablename__ = 'tweet_tweet_source' - id = Column(Integer, primary_key = True, autoincrement=True) + id = Column(Integer, primary_key=True, autoincrement=True) original_json = Column(String) - received_at = Column(DateTime, default=datetime.datetime.now()) + received_at = Column(DateTime, default=datetime.datetime.now(), index=True) def __init__(self, **kwargs): for key, value in kwargs.items(): - if hasattr(self,key): - setattr(self,key,value) + if hasattr(self, key): + setattr(self, key, value) class TweetLog(Base): @@ -62,7 +69,7 @@ } __tablename__ = 'tweet_tweet_log' - id = Column(Integer, primary_key = True, autoincrement=True) + id = Column(Integer, primary_key=True, autoincrement=True) tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id')) tweet_source = relationship("TweetSource", backref="logs") status = Column(Integer) @@ -76,17 +83,17 @@ id = Column(BigInteger, primary_key=True, autoincrement=False) id_str = Column(String) contributors = Column(String) - coordinates = Column(String) + coordinates = Column(String) created_at = Column(DateTime) favorited = Column(Boolean) geo = Column(String) in_reply_to_screen_name = Column(String) in_reply_to_status_id = Column(BigInteger) in_reply_to_status_id_str = Column(String) - in_reply_to_user_id = Column(Integer) + in_reply_to_user_id = Column(BigInteger) in_reply_to_user_id_str = Column(String) place = Column(String) - retweet_count = Column(Integer) + retweet_count = Column(String) retweeted = Column(Boolean) source = Column(String) text = Column(String) @@ -96,17 +103,17 @@ tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id')) tweet_source = relationship("TweetSource", backref="tweet") entity_list = relationship(Entity, backref='tweet') - received_at = Column(DateTime, default=datetime.datetime.now()) + received_at = Column(DateTime, default=datetime.datetime.now(), index=True) def __init__(self, **kwargs): for key, value in kwargs.items(): - if hasattr(self,key): - setattr(self,key,value) + if hasattr(self, key): + setattr(self, key, value) class UserMessage(Base): __tablename__ = "tweet_user_message" - id = Column(Integer, primary_key = True) + id = Column(Integer, primary_key=True) user_id = Column(Integer, ForeignKey('tweet_user.id')) user = relationship("User", backref="messages") created_at = Column(DateTime, default=datetime.datetime.now()) @@ -115,7 +122,7 @@ class Message(Base): __tablename__ = "tweet_message" - id = Column(Integer, primary_key = True) + id = Column(Integer, primary_key=True) created_at = Column(DateTime, default=datetime.datetime.now()) text = Column(String) users = relationship(UserMessage, backref='message') @@ -124,57 +131,55 @@ class User(Base): __tablename__ = "tweet_user" - id = Column(Integer, primary_key = True, autoincrement=False) - id_str= Column(String) - contributors_enabled= Column(Boolean) - created_at= Column(DateTime) - description= Column(String) + id = Column(BigInteger, primary_key=True, autoincrement=False) + id_str = Column(String) + contributors_enabled = Column(Boolean) + created_at = Column(DateTime) + description = Column(String) favourites_count = Column(Integer) follow_request_sent = Column(Boolean) followers_count = Column(Integer) following = Column(String) friends_count = Column(Integer) - geo_enabled= Column(Boolean) - is_translator= Column(Boolean) + geo_enabled = Column(Boolean) + is_translator = Column(Boolean) lang = Column(String) listed_count = Column(Integer) - location= Column(String) + location = Column(String) name = Column(String) notifications = Column(String) - profile_background_color= Column(String) - profile_background_image_url= Column(String) - profile_background_tile= Column(Boolean) - profile_image_url= Column(String) - profile_link_color= Column(String) - profile_sidebar_border_color= Column(String) - profile_sidebar_fill_color= Column(String) - profile_text_color= Column(String) - profile_use_background_image= Column(Boolean) - protected= Column(Boolean) - screen_name= Column(String, index=True, unique=True) - show_all_inline_media= Column(Boolean) + profile_background_color = Column(String) + profile_background_image_url = Column(String) + profile_background_tile = Column(Boolean) + profile_image_url = Column(String) + profile_link_color = Column(String) + profile_sidebar_border_color = Column(String) + profile_sidebar_fill_color = Column(String) + profile_text_color = Column(String) + profile_use_background_image = Column(Boolean) + protected = Column(Boolean) + screen_name = Column(String, index=True, unique=True) + show_all_inline_media = Column(Boolean) statuses_count = Column(Integer) - time_zone= Column(String) - url= Column(String) + time_zone = Column(String) + url = Column(String) utc_offset = Column(Integer) - verified= Column(Boolean) + verified = Column(Boolean) def __init__(self, **kwargs): for key, value in kwargs.items(): - if hasattr(self,key): - setattr(self,key,value) + if hasattr(self, key): + setattr(self, key, value) class Hashtag(Base): __tablename__ = "tweet_hashtag" id = Column(Integer, primary_key=True) - text = Column(String, unique = True, index = True) + text = Column(String, unique=True, index=True) def __init__(self, **kwargs): for key, value in kwargs.items(): - if hasattr(self,key): - setattr(self,key,value) - - + if hasattr(self, key): + setattr(self, key, value) class Url(Base): __tablename__ = "tweet_url" @@ -183,8 +188,35 @@ expanded_url = Column(String) def __init__(self, **kwargs): for key, value in kwargs.items(): - if hasattr(self,key): - setattr(self,key,value) + if hasattr(self, key): + setattr(self, key, value) + +class MediaType(Base): + __tablename__ = "tweet_media_type" + id = Column(Integer, primary_key=True, autoincrement=True) + label = Column(String, unique=True, index=True) + def __init__(self, **kwargs): + for key, value in kwargs.items(): + if hasattr(self, key): + setattr(self, key, value) + + +class Media(Base): + __tablename__ = "tweet_media" + id = Column(BigInteger, primary_key=True, autoincrement=False) + id_str = Column(String, unique=True) + media_url = Column(String, unique=True) + media_url_https = Column(String, unique=True) + url = Column(String) + display_url = Column(String) + expanded_url = Column(String) + sizes = Column(String) + type_id = Column(Integer, ForeignKey("tweet_media_type.id")) + type = relationship(MediaType, primaryjoin=type_id == MediaType.id) + def __init__(self, **kwargs): + for key, value in kwargs.items(): + if hasattr(self, key): + setattr(self, key, value) @@ -197,8 +229,8 @@ def __init__(self, **kwargs): super(EntityHashtag, self).__init__(**kwargs) for key, value in kwargs.items(): - if hasattr(self,key): - setattr(self,key,value) + if hasattr(self, key): + setattr(self, key, value) class EntityUrl(Entity): @@ -210,22 +242,35 @@ def __init__(self, **kwargs): super(EntityUrl, self).__init__(**kwargs) for key, value in kwargs.items(): - if hasattr(self,key): - setattr(self,key,value) + if hasattr(self, key): + setattr(self, key, value) class EntityUser(Entity): __tablename__ = "tweet_entity_user" __mapper_args__ = {'polymorphic_identity': 'entity_user'} id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True) - user_id = Column(Integer, ForeignKey('tweet_user.id')) - user = relationship(User, primaryjoin=user_id == User.id) + user_id = Column(BigInteger, ForeignKey('tweet_user.id')) + user = relationship(User, primaryjoin=(user_id == User.id)) def __init__(self, **kwargs): super(EntityUser, self).__init__(**kwargs) for key, value in kwargs.items(): - if hasattr(self,key): - setattr(self,key,value) + if hasattr(self, key): + setattr(self, key, value) +class EntityMedia(Entity): + __tablename__ = "tweet_entity_media" + __mapper_args__ = {'polymorphic_identity': 'entity_media'} + id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True) + media_id = Column(BigInteger, ForeignKey('tweet_media.id')) + media = relationship(Media, primaryjoin=(media_id == Media.id)) + + def __init__(self, **kwargs): + super(EntityMedia, self).__init__(**kwargs) + for key, value in kwargs.items(): + if hasattr(self, key): + setattr(self, key, value) + def setup_database(*args, **kwargs): @@ -263,15 +308,15 @@ tweet_tweet = { 'contributors': None, - 'coordinates': None, - 'created_at': 'date', - 'entities': "tweet_entity", + 'coordinates': None, + 'created_at': 'date', + 'entities': "tweet_entity", 'favorited': "bool", 'geo': None, 'id': "long", 'id_str': "string", - 'in_reply_to_screen_name': "string", - 'in_reply_to_status_id': "long", + 'in_reply_to_screen_name': "string", + 'in_reply_to_status_id': "long", 'in_reply_to_status_id_str': "string", 'in_reply_to_user_id': "int", 'in_reply_to_user_id_str': "string", @@ -354,6 +399,6 @@ tweet_url = { "url": "string", - "expanded_url" : "string", + "expanded_url" : "string", } diff -r e9335ee3cf71 -r 2209e66bb50b script/lib/iri_tweet/tests.py --- a/script/lib/iri_tweet/tests.py Tue Aug 09 13:07:23 2011 +0200 +++ b/script/lib/iri_tweet/tests.py Fri Aug 12 18:17:27 2011 +0200 @@ -111,7 +111,8 @@ original_json = u'{"in_reply_to_user_id_str":null,"text":"RT @BieberEagle: \\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweeted_status":{"in_reply_to_user_id_str":null,"text":"\\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweet_count":"100+","source":"web","entities":{"user_mentions":[],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96638597737889792","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/298443445\/355584171.jpg","listed_count":5040,"friends_count":8477,"profile_link_color":"ff0000","profile_sidebar_border_color":"000000","url":"http:\/\/twitpic.com\/photos\/BieberEagle","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","profile_image_url":"http:\/\/a2.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","description":"1 name, 1 inspiration, 1 hero, 1 smile, 1 singer, 1 boy who changed my life. B.\u0130.E.B.E.R-Believe In Everything Because Everything\'s Reachable. #NEVERSAYNEVER","default_profile":false,"notifications":null,"time_zone":"Paris","followers_count":14506,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"BieberEagle","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ffffff","location":"\u2665 Albania \u2665 ","id_str":"229067923","profile_background_image_url":"http:\/\/a2.twimg.com\/profile_background_images\/298443445\/355584171.jpg","favourites_count":89,"protected":false,"follow_request_sent":null,"following":null,"name":"truebelieber","statuses_count":24279,"verified":false,"created_at":"Tue Dec 21 12:35:18 +0000 2010","profile_text_color":"000000","id":229067923,"contributors_enabled":false,"utc_offset":3600,"profile_sidebar_fill_color":""},"id":96638597737889792,"created_at":"Thu Jul 28 17:50:11 +0000 2011","geo":null,"in_reply_to_screen_name":null},"retweet_count":"100+","source":"web","entities":{"user_mentions":[{"indices":[3,15],"screen_name":"BieberEagle","id_str":"229067923","name":"truebelieber","id":229067923}],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96965037637382145","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","listed_count":3,"friends_count":1150,"profile_link_color":"00cccc","profile_sidebar_border_color":"c8ff00","url":"http:\/\/www.facebook.com\/blovedbecca180","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","profile_image_url":"http:\/\/a3.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","description":"if ya wanna know something about me, then get to know me. \\n\\r\\n\\ri promise, you wont regret it. (:\\r\\ni love justin bieber with an extreme burning passion!","default_profile":false,"notifications":null,"time_zone":"Central Time (US & Canada)","followers_count":361,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"beccaxannxx","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ff0066","location":"","id_str":"65624607","profile_background_image_url":"http:\/\/a3.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","favourites_count":266,"protected":false,"follow_request_sent":null,"following":null,"name":"beccaxannxx","statuses_count":2512,"verified":false,"created_at":"Fri Aug 14 12:36:35 +0000 2009","profile_text_color":"6a39d4","id":65624607,"contributors_enabled":false,"utc_offset":-21600,"profile_sidebar_fill_color":"ff00bb"},"id":96965037637382145,"created_at":"Fri Jul 29 15:27:21 +0000 2011","geo":null,"in_reply_to_screen_name":null}' - +original_json_media = u'{"user": {"follow_request_sent": null, "profile_use_background_image": true, "id": 34311537, "verified": false, "profile_image_url_https": "https://si0.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "profile_sidebar_fill_color": "DAECF4", "is_translator": false, "geo_enabled": false, "profile_text_color": "663B12", "followers_count": 29, "profile_sidebar_border_color": "C6E2EE", "id_str": "34311537", "default_profile_image": false, "location": "", "utc_offset": -25200, "statuses_count": 813, "description": "i like joe jonas, justin bieber, ashley tisdale, selena gomez, megen fox, kim kardashian and demi lovoto and many more.", "friends_count": 101, "profile_link_color": "1F98C7", "profile_image_url": "http://a1.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "notifications": null, "show_all_inline_media": false, "profile_background_image_url_https": "https://si0.twimg.com/profile_background_images/298244445/tour2011.jpg", "profile_background_color": "C6E2EE", "profile_background_image_url": "http://a1.twimg.com/profile_background_images/298244445/tour2011.jpg", "name": "mikayla", "lang": "en", "profile_background_tile": true, "favourites_count": 231, "screen_name": "bieberfever17ya", "url": null, "created_at": "Wed Apr 22 16:04:28 +0000 2009", "contributors_enabled": false, "time_zone": "Mountain Time (US & Canada)", "protected": false, "default_profile": false, "following": null, "listed_count": 1}, "favorited": false, "entities": {"user_mentions": [], "media": [{"media_url_https": "https://p.twimg.com/AWea5Z-CQAAvfvK.jpg", "expanded_url": "http://twitter.com/bieberfever17ya/status/101219827649232896/photo/1", "sizes": {"small": {"h": 240, "w": 201, "resize": "fit"}, "large": {"h": 240, "w": 201, "resize": "fit"}, "medium": {"h": 240, "w": 201, "resize": "fit"}, "thumb": {"h": 150, "w": 150, "resize": "crop"}}, "url": "http://t.co/N7yZ8hS", "display_url": "pic.twitter.com/N7yZ8hS", "id_str": "101219827653427200", "indices": [31, 50], "type": "photo", "id": 101219827653427200, "media_url": "http://p.twimg.com/AWea5Z-CQAAvfvK.jpg"}], "hashtags": [], "urls": []}, "contributors": null, "truncated": false, "text": "i love you justin bieber <3 http://t.co/N7yZ8hS", "created_at": "Wed Aug 10 09:14:22 +0000 2011", "retweeted": false, "in_reply_to_status_id": null, "coordinates": null, "id": 101219827649232896, "source": "web", "in_reply_to_status_id_str": null, "place": null, "in_reply_to_user_id": null, "in_reply_to_screen_name": null, "retweet_count": 0, "geo": null, "in_reply_to_user_id_str": null, "possibly_sensitive": false, "id_str": "101219827649232896"}' +original_json_media_others = u'{"user": {"utc_offset": -25200, "statuses_count": 813, "default_profile_image": false, "friends_count": 101, "profile_background_image_url_https": "https://si0.twimg.com/profile_background_images/298244445/tour2011.jpg", "profile_use_background_image": true, "profile_sidebar_fill_color": "DAECF4", "profile_link_color": "1F98C7", "profile_image_url": "http://a1.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "time_zone": "Mountain Time (US & Canada)", "is_translator": false, "screen_name": "bieberfever17ya", "url": null, "show_all_inline_media": false, "geo_enabled": false, "profile_background_color": "C6E2EE", "id": 34311537, "profile_background_image_url": "http://a1.twimg.com/profile_background_images/298244445/tour2011.jpg", "description": "i like joe jonas, justin bieber, ashley tisdale, selena gomez, megen fox, kim kardashian and demi lovoto and many more.", "lang": "en", "profile_background_tile": true, "favourites_count": 231, "name": "mikayla", "notifications": null, "follow_request_sent": null, "created_at": "Wed Apr 22 16:04:28 +0000 2009", "verified": false, "contributors_enabled": false, "location": "", "profile_text_color": "663B12", "followers_count": 29, "profile_sidebar_border_color": "C6E2EE", "id_str": "34311537", "default_profile": false, "following": null, "protected": false, "profile_image_url_https": "https://si0.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "listed_count": 1}, "favorited": false, "contributors": null, "source": "web", "text": "i love you justin bieber <3 http://t.co/N7yZ8hS", "created_at": "Wed Aug 10 09:14:22 +0000 2011", "truncated": false, "retweeted": false, "in_reply_to_status_id_str": null, "coordinates": null, "in_reply_to_user_id_str": null, "entities": {"user_mentions": [], "media": [], "hashtags": [], "urls": [], "others": [{"url": "http://t.co/N7yZ8hS", "text": "comments", "indices": [31, 50]}]}, "in_reply_to_status_id": null, "in_reply_to_screen_name": null, "id_str": "101219827649232896", "place": null, "retweet_count": 0, "geo": null, "id": 101219827649232896, "possibly_sensitive": false, "in_reply_to_user_id": null}' class TestTwitterProcessor(unittest.TestCase): @@ -136,7 +137,46 @@ self.assertEqual(u"beccaxannxx",tweet.user.name) self.assertEqual(65624607,tweet.user.id) self.assertEqual(1,len(tweet.entity_list)) - self.assertEqual(u"BieberEagle", tweet.entity_list[0].user.screen_name) + entity = tweet.entity_list[0] + self.assertEqual(u"BieberEagle", entity.user.screen_name) + self.assertTrue(entity.user.created_at is None) + self.assertEqual("entity_user", entity.type) + self.assertEqual("user_mentions", entity.entity_type.label) + + + def testTwitterProcessorMedia(self): + tp = TwitterProcessor(None, original_json_media, None, self.session, self.tmpfilepath) + tp.process() + self.session.commit() + + self.assertEquals(1, self.session.query(models.TweetSource).count()) + self.assertEquals(1, self.session.query(models.Tweet).count()) + self.assertEquals(1, self.session.query(models.User).count()) + tweet = self.session.query(models.Tweet).first() + self.assertFalse(tweet.user is None) + self.assertEqual(u"mikayla",tweet.user.name) + self.assertEqual(34311537,tweet.user.id) + self.assertEqual(1,len(tweet.entity_list)) + entity = tweet.entity_list[0] + self.assertEqual(101219827653427200, entity.media.id) + self.assertEqual("photo", entity.media.type.label) + self.assertEqual("entity_media", entity.type) + self.assertEqual("media", entity.entity_type.label) + + + def testTwitterProcessorMediaOthers(self): + tp = TwitterProcessor(None, original_json_media_others, None, self.session, self.tmpfilepath) + tp.process() + self.session.commit() + + self.assertEquals(1, self.session.query(models.TweetSource).count()) + self.assertEquals(1, self.session.query(models.Tweet).count()) + tweet = self.session.query(models.Tweet).first() + self.assertEqual(1,len(tweet.entity_list)) + entity = tweet.entity_list[0] + self.assertEqual("entity_entity", entity.type) + self.assertEqual("others", entity.entity_type.label) + def tearDown(self): diff -r e9335ee3cf71 -r 2209e66bb50b script/lib/iri_tweet/utils.py --- a/script/lib/iri_tweet/utils.py Tue Aug 09 13:07:23 2011 +0200 +++ b/script/lib/iri_tweet/utils.py Fri Aug 12 18:17:27 2011 +0200 @@ -1,6 +1,7 @@ from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, - ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog) + ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType, + Media, EntityMedia, Entity, EntityType) from sqlalchemy.sql import select, or_ #@UnresolvedImport import anyjson #@UnresolvedImport import datetime @@ -16,24 +17,40 @@ CACHE_ACCESS_TOKEN = {} -def get_oauth_token(token_file_path=None, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET): +def get_oauth_token(token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET): global CACHE_ACCESS_TOKEN - - if CACHE_ACCESS_TOKEN is not None and application_name in CACHE_ACCESS_TOKEN: - return CACHE_ACCESS_TOKEN[application_name] - - if token_file_path and os.path.exists(token_file_path): - logging.debug("reading token from file %s" % token_file_path) #@UndefinedVariable - CACHE_ACCESS_TOKEN[application_name] = twitter.oauth.read_token_file(token_file_path) - return CACHE_ACCESS_TOKEN[application_name] - #read access token info from path - - if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET: + + if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET: return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET - CACHE_ACCESS_TOKEN[application_name] = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path) - return CACHE_ACCESS_TOKEN[application_name] + res = CACHE_ACCESS_TOKEN.get(application_name, None) + + if res is None and token_file_path and os.path.exists(token_file_path): + get_logger().debug("get_oauth_token : reading token from file %s" % token_file_path) #@UndefinedVariable + res = twitter.oauth.read_token_file(token_file_path) + + if res is not None and check_access_token: + get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable + t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], CONSUMER_KEY, CONSUMER_SECRET)) + status = None + try: + status = t.account.rate_limit_status() + except Exception as e: + get_logger().debug("get_oauth_token : error getting rate limit status %s" % repr(e)) + status = None + get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable + if status is None or status['remaining_hits'] == 0: + get_logger().debug("get_oauth_token : Problem with status %s" % repr(status)) + res = None + + if res is None: + get_logger().debug("get_oauth_token : doing the oauth dance") + res = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path) + + CACHE_ACCESS_TOKEN[application_name] = res + + return res def parse_date(date_str): ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable @@ -54,6 +71,13 @@ "user": { "created_at" : adapt_date, }, + + }, + + 'entities' : { + "medias": { + "sizes" : adapt_json, + }, }, 'rest': { "tweet" : { @@ -89,7 +113,12 @@ new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else [] new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {} - self.instance = self.klass(*new_args, **new_kwargs) + if self.instance is None: + self.instance = self.klass(*new_args, **new_kwargs) + else: + self.instance = self.klass(*new_args, **new_kwargs) + self.instance = session.merge(self.instance) + session.add(self.instance) if self.must_flush: session.flush() @@ -104,40 +133,45 @@ def __init__(self): self.__bufferlist = [] + self.__bufferdict = {} + + def __add_proxy_object(self, proxy): + proxy_list = self.__bufferdict.get(proxy.klass, None) + if proxy_list is None: + proxy_list = [] + self.__bufferdict[proxy.klass] = proxy_list + proxy_list.append(proxy) + self.__bufferlist.append(proxy) def persists(self, session): for object_proxy in self.__bufferlist: object_proxy.persists(session) - - def add_object(self, klass, args, kwargs, must_flush): - new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush) - self.__bufferlist.append(new_proxy) + + def add_object(self, klass, args, kwargs, must_flush, instance=None): + new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush, instance) + self.__add_proxy_object(new_proxy) return new_proxy def get(self, klass, **kwargs): - for proxy in self.__bufferlist: - if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass: - continue - found = True - for k,v in kwargs.items(): - if (k not in proxy.kwargs) or v != proxy.kwargs[k]: - found = False - break - if found: - return proxy - + if klass in self.__bufferdict: + for proxy in self.__bufferdict[klass]: + if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass: + continue + found = True + for k,v in kwargs.items(): + if (k not in proxy.kwargs) or v != proxy.kwargs[k]: + found = False + break + if found: + return proxy return None - - - - class TwitterProcessorException(Exception): pass class TwitterProcessor(object): - def __init__(self, json_dict, json_txt, source_id, session, token_filename=None): + def __init__(self, json_dict, json_txt, source_id, session, access_token=None, token_filename=None): if json_dict is None and json_txt is None: raise TwitterProcessorException("No json") @@ -158,11 +192,13 @@ self.source_id = source_id self.session = session self.token_filename = token_filename + self.access_token = access_token self.obj_buffer = ObjectsBuffer() + - def __get_user(self, user_dict): - logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable + def __get_user(self, user_dict, do_merge, query_twitter = False): + get_logger().debug("Get user : " + repr(user_dict)) #@UndefinedVariable user_id = user_dict.get("id",None) user_name = user_dict.get("screen_name", user_dict.get("name", None)) @@ -192,8 +228,12 @@ user_created_at = user_dict.get("created_at", None) - if user_created_at is None: - acess_token_key, access_token_secret = get_oauth_token(self.token_filename) + if user_created_at is None and query_twitter: + + if self.access_token is not None: + acess_token_key, access_token_secret = self.access_token + else: + acess_token_key, access_token_secret = get_oauth_token(self.token_filename) t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET)) try: if user_id: @@ -201,8 +241,8 @@ else: user_dict = t.users.show(screen_name=user_name) except Exception as e: - logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable - logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable + get_logger().info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable + get_logger().info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable return None user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) @@ -210,45 +250,79 @@ return None #TODO filter get, wrap in proxy - user = self.session.query(User).filter(User.id == user_dict["id"]).first() + user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first() - if user is not None: - return user + if user_obj is not None: + if not do_merge: + return ObjectBufferProxy(User, None, None, False, user_obj) user = self.obj_buffer.add_object(User, None, user_dict, True) return user + def __get_or_create_object(self, klass, filter_by_kwargs, filter, creation_kwargs, must_flush, do_merge): + + obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs) + if obj_proxy is None: + query = self.session.query(klass) + if filter is not None: + query = query.filter(filter) + else: + query = query.filter_by(**filter_by_kwargs) + obj_instance = query.first() + if obj_instance is not None: + if not do_merge: + obj_proxy = ObjectBufferProxy(klass, None, None, False, obj_instance) + else: + obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush, obj_instance) + if obj_proxy is None: + obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush) + return obj_proxy + def __process_entity(self, ind, ind_type): - logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable + get_logger().debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable ind = clean_keys(ind) + entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False) + entity_dict = { - "indice_start": ind["indices"][0], - "indice_end" : ind["indices"][1], - "tweet_id" : self.tweet.id, + "indice_start" : ind["indices"][0], + "indice_end" : ind["indices"][1], + "tweet_id" : self.tweet.id, + "entity_type_id" : entity_type.id, + "source" : adapt_json(ind) } - + + def process_medias(): + + media_id = ind.get('id', None) + if media_id is None: + return None, None + + type_str = ind.get("type", "photo") + media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False) + media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"]) + if "type" in media_ind: + del(media_ind["type"]) + media_ind['type_id'] = media_type.id + media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False) + + entity_dict['media_id'] = media.id + return EntityMedia, entity_dict + def process_hashtags(): text = ind.get("text", ind.get("hashtag", None)) if text is None: - return None - hashtag = self.obj_buffer.get(Hashtag, text=text) - if hashtag is None: - hashtag_obj = self.session.query(Hashtag).filter(Hashtag.text.ilike(text)).first() - if hashtag_obj is not None: - hashtag = ObjectBufferProxy(Hashtag, None, None, False, hashtag_obj) - - if hashtag is None: - ind["text"] = text - hashtag = self.obj_buffer.add_object(Hashtag, None, ind, True) + return None, None + ind['text'] = text + hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False) entity_dict['hashtag_id'] = hashtag.id return EntityHashtag, entity_dict def process_user_mentions(): - user_mention = self.__get_user(ind) + user_mention = self.__get_user(ind, False, False) if user_mention is None: entity_dict['user_id'] = None else: @@ -256,24 +330,19 @@ return EntityUser, entity_dict def process_urls(): - url = self.obj_buffer.get(Url, url=ind["url"]) - if url is None: - url_obj = self.session.query(Url).filter(Url.url == ind["url"]).first() - if url_obj is not None: - url = ObjectBufferProxy(Url, None, None, False, url_obj) - if url is None: - url = self.obj_buffer.add_object(Url, None, ind, True) + url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False) entity_dict['url_id'] = url.id return EntityUrl, entity_dict - + #{'': lambda } entity_klass, entity_dict = { 'hashtags': process_hashtags, 'user_mentions' : process_user_mentions, - 'urls' : process_urls - }[ind_type]() + 'urls' : process_urls, + 'media': process_medias, + }.get(ind_type, lambda: (Entity, entity_dict))() - logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable + get_logger().debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable if entity_klass: self.obj_buffer.add_object(entity_klass, None, entity_dict, False) @@ -287,9 +356,9 @@ ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"]) # get or create user - user = self.__get_user(self.json_dict["user"]) + user = self.__get_user(self.json_dict["user"], True) if user is None: - logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable + get_logger().warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable ts_copy["user_id"] = None else: ts_copy["user_id"] = user.id @@ -299,25 +368,26 @@ self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True) - # get entities + self.__process_entities() + + + def __process_entities(self): if "entities" in self.json_dict: for ind_type, entity_list in self.json_dict["entities"].items(): for ind in entity_list: self.__process_entity(ind, ind_type) else: - extractor = twitter_text.Extractor(self.tweet.text) - + + text = self.tweet.text + extractor = twitter_text.Extractor(text) for ind in extractor.extract_hashtags_with_indices(): self.__process_entity(ind, "hashtags") - + + for ind in extractor.extract_urls_with_indices(): + self.__process_entity(ind, "urls") + for ind in extractor.extract_mentioned_screen_names_with_indices(): self.__process_entity(ind, "user_mentions") - - for ind in extractor.extract_urls_with_indices(): - self.__process_entity(ind, "urls") - - self.session.flush() - def __process_twitter_rest(self): tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() @@ -348,28 +418,17 @@ 'screen_name' : self.json_dict["from_user"], } - user = self.__get_user(user_fields) + user = self.__get_user(user_fields, do_merge=False) if user is None: - logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable + get_logger().warning("USER not found " + repr(user_fields)) #@UndefinedVariable tweet_fields["user_id"] = None else: tweet_fields["user_id"] = user.id tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True) - - text = self.tweet.text - - extractor = twitter_text.Extractor(text) - - for ind in extractor.extract_hashtags_with_indices(): - self.__process_entity(ind, "hashtags") - - for ind in extractor.extract_urls_with_indices(): - self.__process_entity(ind, "urls") - - for ind in extractor.extract_mentioned_screen_names_with_indices(): - self.__process_entity(ind, "user_mentions") + + self.__process_entities() @@ -384,7 +443,7 @@ else: self.__process_twitter_stream() - self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, False) + self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, True) self.obj_buffer.persists(self.session) @@ -405,7 +464,7 @@ logger = plogger if logger is None: - logger = logging.getLogger() #@UndefinedVariable + logger = get_logger() #@UndefinedVariable if len(logger.handlers) == 0: filename = logging_config.get("filename") @@ -477,4 +536,5 @@ return query.distinct() -logger = logging.getLogger() #@UndefinedVariable +def get_logger(): + return logging.getLogger("iri_tweet") #@UndefinedVariable diff -r e9335ee3cf71 -r 2209e66bb50b script/lib/tweetstream/tweetstream/streamclasses.py --- a/script/lib/tweetstream/tweetstream/streamclasses.py Tue Aug 09 13:07:23 2011 +0200 +++ b/script/lib/tweetstream/tweetstream/streamclasses.py Fri Aug 12 18:17:27 2011 +0200 @@ -2,8 +2,8 @@ import anyjson import socket import time -import urllib -import urllib2 +import urllib #@UnresolvedImport +import urllib2 #@UnresolvedImport socket._fileobject.default_bufsize = 0 diff -r e9335ee3cf71 -r 2209e66bb50b script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Tue Aug 09 13:07:23 2011 +0200 +++ b/script/stream/recorder_tweetstream.py Fri Aug 12 18:17:27 2011 +0200 @@ -3,22 +3,25 @@ from iri_tweet.models import TweetSource, TweetLog from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger from optparse import OptionParser -from sqlalchemy.orm import sessionmaker +from sqlalchemy.exc import OperationalError +from sqlalchemy.orm import scoped_session, sessionmaker import StringIO -import logging import anyjson import datetime +import logging import os +import re import shutil import signal import socket +import sqlalchemy.schema import sys import time import traceback import tweepy.auth import tweetstream -from iri_tweet.utils import logger -from sqlalchemy.exc import OperationalError +import urllib2 +#from iri_tweet.utils import get_logger socket._fileobject.default_bufsize = 0 @@ -30,6 +33,26 @@ #just put it in a sqlite3 tqble +def set_logging(options): + utils.set_logging(options, logging.getLogger('iri_tweet')) + utils.set_logging(options, logging.getLogger('multiprocessing')) + if options.debug >= 2: + utils.set_logging(options, logging.getLogger('sqlalchemy.engine')) + #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects')) + #utils.set_logging(options, logging.getLogger('sqlalchemy.pool')) + #utils.set_logging(options, logging.getLogger('sqlalchemy.orm')) + +def get_auth(options, access_token): + if options.username and options.password: + auth = tweepy.auth.BasicAuthHandler(options.username, options.password) + else: + consumer_key = models.CONSUMER_KEY + consumer_secret = models.CONSUMER_SECRET + auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) + auth.set_access_token(*access_token) + return auth + + class ReconnectingTweetStream(tweetstream.FilterStream): """TweetStream class that automatically tries to reconnect if the connecting goes down. Reconnecting, and waiting for reconnecting, is @@ -62,9 +85,10 @@ def next(self): while True: try: + utils.get_logger().debug("return super.next") return super(ReconnectingTweetStream, self).next() except tweetstream.ConnectionError, e: - logging.debug("connection error :" + str(e)) + utils.get_logger().debug("connection error :" + str(e)) self._reconnects += 1 if self._reconnects > self.max_reconnects: raise tweetstream.ConnectionError("Too many retries") @@ -80,38 +104,43 @@ + class SourceProcess(Process): - def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event): + def __init__(self, session_maker, queue, options, access_token, stop_event): self.session_maker = session_maker self.queue = queue - self.auth = auth - self.track = track - self.debug = debug - self.reconnects = reconnects - self.token_filename = token_filename + self.track = options.track + self.reconnects = options.reconnects + self.token_filename = options.token_filename self.stop_event = stop_event + self.options = options + self.access_token = access_token super(SourceProcess, self).__init__() -# self.stop_event = def run(self): + #import pydevd + #pydevd.settrace(suspend=False) + + set_logging(self.options) + self.auth = get_auth(self.options, self.access_token) - get_logger().debug("SourceProcess : run") + utils.get_logger().debug("SourceProcess : run") track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() track_list = [k for k in track_list.split(',')] - get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list)) + utils.get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list)) stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True) - get_logger().debug("SourceProcess : after connecting to stream") + utils.get_logger().debug("SourceProcess : after connecting to stream") stream.muststop = lambda: self.stop_event.is_set() session = self.session_maker() try: for tweet in stream: - get_logger().debug("tweet " + repr(tweet)) + utils.get_logger().debug("tweet " + repr(tweet)) source = TweetSource(original_json=tweet) - get_logger().debug("source created") + utils.get_logger().debug("source created") add_retries = 0 while add_retries < 10: try: @@ -121,21 +150,18 @@ break except OperationalError as e: session.rollback() - get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries)) - if add_retries==10: + utils.get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries)) + if add_retries == 10: raise e source_id = source.id - get_logger().debug("before queue + source id " + repr(source_id)) - self.queue.put((source_id, tweet), False) - #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename) - get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) + utils.get_logger().debug("before queue + source id " + repr(source_id)) + utils.get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) session.commit() -# if duration >= 0 and datetime.datetime.utcnow() >= end_ts: -# print "Stop recording after %d seconds." % (duration) -# break + self.queue.put((source_id, tweet), False) + except Exception as e: - get_logger().error("Error when processing tweet " + repr(e)) + utils.get_logger().error("Error when processing tweet " + repr(e)) finally: session.rollback() stream.close() @@ -144,19 +170,19 @@ self.stop_event.set() -def process_tweet(tweet, source_id, session, token_filename): +def process_tweet(tweet, source_id, session, access_token): try: tweet_obj = anyjson.deserialize(tweet) screen_name = "" if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: screen_name = tweet_obj['user']['screen_name'] - get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) - get_logger().debug(u"Process_tweet :" + repr(tweet)) - processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename) + utils.get_logger().debug(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) + utils.get_logger().debug(u"Process_tweet :" + repr(tweet)) + processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None) processor.process() except Exception as e: message = u"Error %s processing tweet %s" % (repr(e), tweet) - get_logger().error(message) + utils.get_logger().error(message) output = StringIO.StringIO() traceback.print_exception(Exception, e, None, None, output) error_stack = output.getvalue() @@ -170,42 +196,49 @@ class TweetProcess(Process): - def __init__(self, session_maker, queue, debug, token_filename, stop_event): + def __init__(self, session_maker, queue, options, access_token, stop_event): self.session_maker = session_maker self.queue = queue - self.debug = debug self.stop_event = stop_event - self.token_filename = token_filename + self.options = options + self.access_token = access_token super(TweetProcess, self).__init__() def run(self): + set_logging(self.options) session = self.session_maker() try: while not self.stop_event.is_set(): try: - source_id, tweet_txt = queue.get(True, 10) - get_logger().debug("Processing source id " + repr(source_id)) + source_id, tweet_txt = queue.get(True, 3) + utils.get_logger().debug("Processing source id " + repr(source_id)) except Exception as e: - get_logger().debug('Process tweet exception in loop : ' + repr(e)) + utils.get_logger().debug('Process tweet exception in loop : ' + repr(e)) continue - process_tweet(tweet_txt, source_id, session, self.token_filename) + process_tweet(tweet_txt, source_id, session, self.access_token) session.commit() - except: - raise finally: session.rollback() self.stop_event.set() session.close() + + +def get_sessionmaker(conn_str): + engine, metadata = models.setup_database(conn_str, echo=False, create_all=False) + Session = scoped_session(sessionmaker(bind=engine, autocommit=False)) + return Session, engine, metadata + -def process_leftovers(session, token_filename): +def process_leftovers(session, access_token): sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) for src in sources: tweet_txt = src.original_json - process_tweet(tweet_txt, src.id, session, token_filename) + process_tweet(tweet_txt, src.id, session, access_token) + session.commit() @@ -215,8 +248,8 @@ def get_options(): parser = OptionParser() - parser.add_option("-f", "--file", dest="filename", - help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db") + parser.add_option("-f", "--file", dest="conn_str", + help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db") parser.add_option("-u", "--user", dest="username", help="Twitter user", metavar="USER", default=None) parser.add_option("-w", "--password", dest="password", @@ -231,8 +264,8 @@ help="Token file name") parser.add_option("-d", "--duration", dest="duration", help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') - parser.add_option("-N", "--consumer", dest="consumer_nb", - help="number of consumer", metavar="CONSUMER", default=1, type='int') + parser.add_option("-N", "--nb-process", dest="process_nb", + help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') @@ -245,53 +278,81 @@ (options, args) = get_options() - utils.set_logging(options, get_logger()) + set_logging(options) if options.debug: print "OPTIONS : " print repr(options) - if options.new and os.path.exists(options.filename): - i = 1 - basename, extension = os.path.splitext(options.filename) - new_path = '%s.%d%s' % (basename, i, extension) - while i < 1000000 and os.path.exists(new_path): - i += 1 + + conn_str = options.conn_str.strip() + if not re.match("^\w+://.+", conn_str): + conn_str = 'sqlite://' + options.conn_str + + if conn_str.startswith("sqlite") and options.new: + filepath = conn_str[conn_str.find("://"):] + if os.path.exists(filepath): + i = 1 + basename, extension = os.path.splitext(filepath) new_path = '%s.%d%s' % (basename, i, extension) - if i >= 1000000: - raise Exception("Unable to find new filename for " + options.filename) - else: - shutil.move(options.filename, new_path) + while i < 1000000 and os.path.exists(new_path): + i += 1 + new_path = '%s.%d%s' % (basename, i, extension) + if i >= 1000000: + raise Exception("Unable to find new filename for " + filepath) + else: + shutil.move(filepath, new_path) + Session, engine, metadata = get_sessionmaker(conn_str) + if options.new: + check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True) + if len(check_metadata.sorted_tables) > 0: + message = "Database %s not empty exiting" % conn_str + utils.get_logger().error(message) + sys.exit(message) + + metadata.create_all(engine) + + access_token = None + if not options.username or not options.password: + access_token = utils.get_oauth_token(options.token_filename) + + session = Session() + try: + process_leftovers(session, access_token) + session.commit() + finally: + session.rollback() + session.close() + + if options.process_nb <= 0: + utils.get_logger().debug("Leftovers processed. Exiting.") + sys.exit() + queue = JoinableQueue() stop_event = Event() - - if options.username and options.password: - auth = tweepy.auth.BasicAuthHandler(options.username, options.password) - else: - consumer_key = models.CONSUMER_KEY - consumer_secret = models.CONSUMER_SECRET - auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) - auth.set_access_token(*(utils.get_oauth_token(options.token_filename))) - - - engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2)) - Session = sessionmaker(bind=engine) - session = Session() - process_leftovers(session, options.token_filename) - session.commit() - session.close() - - sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event) + #workaround for bug on using urllib2 and multiprocessing + req = urllib2.Request('http://localhost') + conn = None + try: + conn = urllib2.urlopen(req) + except: + pass + #donothing + finally: + if conn is not None: + conn.close() + + + sprocess = SourceProcess(Session, queue, options, access_token, stop_event) tweet_processes = [] - for i in range(options.consumer_nb): - engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2)) - Session = sessionmaker(bind=engine) - cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event) + for i in range(options.process_nb - 1): + Session, engine, metadata = get_sessionmaker(conn_str) + cprocess = TweetProcess(Session, queue, options, access_token, stop_event) tweet_processes.append(cprocess) def interupt_handler(signum, frame): @@ -311,23 +372,36 @@ stop_event.set() break if sprocess.is_alive(): - time.sleep(0.1) + time.sleep(1) else: + stop_event.set() break - get_logger().debug("Joining Source Process") - sprocess.join() - get_logger().debug("Joining Queue") + utils.get_logger().debug("Joining Source Process") + try: + sprocess.join(10) + except: + utils.get_logger().debug("Pb joining Source Process - terminating") + sprocess.terminate() + + utils.get_logger().debug("Joining Queue") #queue.join() - for i,cprocess in enumerate(tweet_processes): - get_logger().debug("Joining consumer process Nb %d" % (i+1)) - cprocess.join() + for i, cprocess in enumerate(tweet_processes): + utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) + try: + cprocess.join(3) + except: + utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) + cprocess.terminate() - get_logger().debug("Processing leftovers") + utils.get_logger().debug("Processing leftovers") session = Session() - process_leftovers(session, options.token_filename) - session.commit() - session.close() + try: + process_leftovers(session, access_token) + session.commit() + finally: + session.rollback() + session.close() - get_logger().debug("Done. Exiting.") - \ No newline at end of file + utils.get_logger().debug("Done. Exiting.") +