--- a/.hgignore Tue Aug 09 13:07:23 2011 +0200
+++ b/.hgignore Fri Aug 12 18:17:27 2011 +0200
@@ -33,3 +33,6 @@
syntax: regexp
^web/CPV/config\.php$
+
+syntax: regexp
+^script/virtualenv/venv2$
\ No newline at end of file
--- a/script/lib/iri_tweet/models.py Tue Aug 09 13:07:23 2011 +0200
+++ b/script/lib/iri_tweet/models.py Fri Aug 12 18:17:27 2011 +0200
@@ -27,31 +27,38 @@
else:
return anyjson.serialize(obj)
+class EntityType(Base):
+ __tablename__ = "tweet_entity_type"
+ id = Column(Integer, primary_key=True, autoincrement=True)
+ label = Column(String)
+
class Entity(Base):
__tablename__ = "tweet_entity"
- id = Column(Integer, primary_key = True)
+ id = Column(Integer, primary_key=True)
tweet_id = Column(BigInteger, ForeignKey('tweet_tweet.id'))
- #tweet = relationship(Tweet, primaryjoin = tweet_id == Tweet.id)
type = Column(String)
+ entity_type_id = Column(Integer, ForeignKey('tweet_entity_type.id'), nullable=False)
+ entity_type = relationship("EntityType", backref="entities")
indice_start = Column(Integer)
indice_end = Column(Integer)
- __mapper_args__ = {'polymorphic_on': type}
+ source = Column(String)
+ __mapper_args__ = {'polymorphic_on': type, 'polymorphic_identity': 'entity_entity', 'with_polymorphic':'*'}
def __init__(self, **kwargs):
for key, value in kwargs.items():
- if hasattr(self,key):
- setattr(self,key,value)
+ if hasattr(self, key):
+ setattr(self, key, value)
class TweetSource(Base):
__tablename__ = 'tweet_tweet_source'
- id = Column(Integer, primary_key = True, autoincrement=True)
+ id = Column(Integer, primary_key=True, autoincrement=True)
original_json = Column(String)
- received_at = Column(DateTime, default=datetime.datetime.now())
+ received_at = Column(DateTime, default=datetime.datetime.now(), index=True)
def __init__(self, **kwargs):
for key, value in kwargs.items():
- if hasattr(self,key):
- setattr(self,key,value)
+ if hasattr(self, key):
+ setattr(self, key, value)
class TweetLog(Base):
@@ -62,7 +69,7 @@
}
__tablename__ = 'tweet_tweet_log'
- id = Column(Integer, primary_key = True, autoincrement=True)
+ id = Column(Integer, primary_key=True, autoincrement=True)
tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
tweet_source = relationship("TweetSource", backref="logs")
status = Column(Integer)
@@ -76,17 +83,17 @@
id = Column(BigInteger, primary_key=True, autoincrement=False)
id_str = Column(String)
contributors = Column(String)
- coordinates = Column(String)
+ coordinates = Column(String)
created_at = Column(DateTime)
favorited = Column(Boolean)
geo = Column(String)
in_reply_to_screen_name = Column(String)
in_reply_to_status_id = Column(BigInteger)
in_reply_to_status_id_str = Column(String)
- in_reply_to_user_id = Column(Integer)
+ in_reply_to_user_id = Column(BigInteger)
in_reply_to_user_id_str = Column(String)
place = Column(String)
- retweet_count = Column(Integer)
+ retweet_count = Column(String)
retweeted = Column(Boolean)
source = Column(String)
text = Column(String)
@@ -96,17 +103,17 @@
tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
tweet_source = relationship("TweetSource", backref="tweet")
entity_list = relationship(Entity, backref='tweet')
- received_at = Column(DateTime, default=datetime.datetime.now())
+ received_at = Column(DateTime, default=datetime.datetime.now(), index=True)
def __init__(self, **kwargs):
for key, value in kwargs.items():
- if hasattr(self,key):
- setattr(self,key,value)
+ if hasattr(self, key):
+ setattr(self, key, value)
class UserMessage(Base):
__tablename__ = "tweet_user_message"
- id = Column(Integer, primary_key = True)
+ id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('tweet_user.id'))
user = relationship("User", backref="messages")
created_at = Column(DateTime, default=datetime.datetime.now())
@@ -115,7 +122,7 @@
class Message(Base):
__tablename__ = "tweet_message"
- id = Column(Integer, primary_key = True)
+ id = Column(Integer, primary_key=True)
created_at = Column(DateTime, default=datetime.datetime.now())
text = Column(String)
users = relationship(UserMessage, backref='message')
@@ -124,57 +131,55 @@
class User(Base):
__tablename__ = "tweet_user"
- id = Column(Integer, primary_key = True, autoincrement=False)
- id_str= Column(String)
- contributors_enabled= Column(Boolean)
- created_at= Column(DateTime)
- description= Column(String)
+ id = Column(BigInteger, primary_key=True, autoincrement=False)
+ id_str = Column(String)
+ contributors_enabled = Column(Boolean)
+ created_at = Column(DateTime)
+ description = Column(String)
favourites_count = Column(Integer)
follow_request_sent = Column(Boolean)
followers_count = Column(Integer)
following = Column(String)
friends_count = Column(Integer)
- geo_enabled= Column(Boolean)
- is_translator= Column(Boolean)
+ geo_enabled = Column(Boolean)
+ is_translator = Column(Boolean)
lang = Column(String)
listed_count = Column(Integer)
- location= Column(String)
+ location = Column(String)
name = Column(String)
notifications = Column(String)
- profile_background_color= Column(String)
- profile_background_image_url= Column(String)
- profile_background_tile= Column(Boolean)
- profile_image_url= Column(String)
- profile_link_color= Column(String)
- profile_sidebar_border_color= Column(String)
- profile_sidebar_fill_color= Column(String)
- profile_text_color= Column(String)
- profile_use_background_image= Column(Boolean)
- protected= Column(Boolean)
- screen_name= Column(String, index=True, unique=True)
- show_all_inline_media= Column(Boolean)
+ profile_background_color = Column(String)
+ profile_background_image_url = Column(String)
+ profile_background_tile = Column(Boolean)
+ profile_image_url = Column(String)
+ profile_link_color = Column(String)
+ profile_sidebar_border_color = Column(String)
+ profile_sidebar_fill_color = Column(String)
+ profile_text_color = Column(String)
+ profile_use_background_image = Column(Boolean)
+ protected = Column(Boolean)
+ screen_name = Column(String, index=True, unique=True)
+ show_all_inline_media = Column(Boolean)
statuses_count = Column(Integer)
- time_zone= Column(String)
- url= Column(String)
+ time_zone = Column(String)
+ url = Column(String)
utc_offset = Column(Integer)
- verified= Column(Boolean)
+ verified = Column(Boolean)
def __init__(self, **kwargs):
for key, value in kwargs.items():
- if hasattr(self,key):
- setattr(self,key,value)
+ if hasattr(self, key):
+ setattr(self, key, value)
class Hashtag(Base):
__tablename__ = "tweet_hashtag"
id = Column(Integer, primary_key=True)
- text = Column(String, unique = True, index = True)
+ text = Column(String, unique=True, index=True)
def __init__(self, **kwargs):
for key, value in kwargs.items():
- if hasattr(self,key):
- setattr(self,key,value)
-
-
+ if hasattr(self, key):
+ setattr(self, key, value)
class Url(Base):
__tablename__ = "tweet_url"
@@ -183,8 +188,35 @@
expanded_url = Column(String)
def __init__(self, **kwargs):
for key, value in kwargs.items():
- if hasattr(self,key):
- setattr(self,key,value)
+ if hasattr(self, key):
+ setattr(self, key, value)
+
+class MediaType(Base):
+ __tablename__ = "tweet_media_type"
+ id = Column(Integer, primary_key=True, autoincrement=True)
+ label = Column(String, unique=True, index=True)
+ def __init__(self, **kwargs):
+ for key, value in kwargs.items():
+ if hasattr(self, key):
+ setattr(self, key, value)
+
+
+class Media(Base):
+ __tablename__ = "tweet_media"
+ id = Column(BigInteger, primary_key=True, autoincrement=False)
+ id_str = Column(String, unique=True)
+ media_url = Column(String, unique=True)
+ media_url_https = Column(String, unique=True)
+ url = Column(String)
+ display_url = Column(String)
+ expanded_url = Column(String)
+ sizes = Column(String)
+ type_id = Column(Integer, ForeignKey("tweet_media_type.id"))
+ type = relationship(MediaType, primaryjoin=type_id == MediaType.id)
+ def __init__(self, **kwargs):
+ for key, value in kwargs.items():
+ if hasattr(self, key):
+ setattr(self, key, value)
@@ -197,8 +229,8 @@
def __init__(self, **kwargs):
super(EntityHashtag, self).__init__(**kwargs)
for key, value in kwargs.items():
- if hasattr(self,key):
- setattr(self,key,value)
+ if hasattr(self, key):
+ setattr(self, key, value)
class EntityUrl(Entity):
@@ -210,22 +242,35 @@
def __init__(self, **kwargs):
super(EntityUrl, self).__init__(**kwargs)
for key, value in kwargs.items():
- if hasattr(self,key):
- setattr(self,key,value)
+ if hasattr(self, key):
+ setattr(self, key, value)
class EntityUser(Entity):
__tablename__ = "tweet_entity_user"
__mapper_args__ = {'polymorphic_identity': 'entity_user'}
id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
- user_id = Column(Integer, ForeignKey('tweet_user.id'))
- user = relationship(User, primaryjoin=user_id == User.id)
+ user_id = Column(BigInteger, ForeignKey('tweet_user.id'))
+ user = relationship(User, primaryjoin=(user_id == User.id))
def __init__(self, **kwargs):
super(EntityUser, self).__init__(**kwargs)
for key, value in kwargs.items():
- if hasattr(self,key):
- setattr(self,key,value)
+ if hasattr(self, key):
+ setattr(self, key, value)
+class EntityMedia(Entity):
+ __tablename__ = "tweet_entity_media"
+ __mapper_args__ = {'polymorphic_identity': 'entity_media'}
+ id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
+ media_id = Column(BigInteger, ForeignKey('tweet_media.id'))
+ media = relationship(Media, primaryjoin=(media_id == Media.id))
+
+ def __init__(self, **kwargs):
+ super(EntityMedia, self).__init__(**kwargs)
+ for key, value in kwargs.items():
+ if hasattr(self, key):
+ setattr(self, key, value)
+
def setup_database(*args, **kwargs):
@@ -263,15 +308,15 @@
tweet_tweet = {
'contributors': None,
- 'coordinates': None,
- 'created_at': 'date',
- 'entities': "tweet_entity",
+ 'coordinates': None,
+ 'created_at': 'date',
+ 'entities': "tweet_entity",
'favorited': "bool",
'geo': None,
'id': "long",
'id_str': "string",
- 'in_reply_to_screen_name': "string",
- 'in_reply_to_status_id': "long",
+ 'in_reply_to_screen_name': "string",
+ 'in_reply_to_status_id': "long",
'in_reply_to_status_id_str': "string",
'in_reply_to_user_id': "int",
'in_reply_to_user_id_str': "string",
@@ -354,6 +399,6 @@
tweet_url = {
"url": "string",
- "expanded_url" : "string",
+ "expanded_url" : "string",
}
--- a/script/lib/iri_tweet/tests.py Tue Aug 09 13:07:23 2011 +0200
+++ b/script/lib/iri_tweet/tests.py Fri Aug 12 18:17:27 2011 +0200
@@ -111,7 +111,8 @@
original_json = u'{"in_reply_to_user_id_str":null,"text":"RT @BieberEagle: \\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweeted_status":{"in_reply_to_user_id_str":null,"text":"\\"I love my haters. They spend so much time thinking about me. Aren\u2019t they sweet?\\" - Justin Bieber","contributors":null,"retweeted":false,"coordinates":null,"retweet_count":"100+","source":"web","entities":{"user_mentions":[],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96638597737889792","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/298443445\/355584171.jpg","listed_count":5040,"friends_count":8477,"profile_link_color":"ff0000","profile_sidebar_border_color":"000000","url":"http:\/\/twitpic.com\/photos\/BieberEagle","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","profile_image_url":"http:\/\/a2.twimg.com\/profile_images\/1465491672\/355584171_normal.jpg","description":"1 name, 1 inspiration, 1 hero, 1 smile, 1 singer, 1 boy who changed my life. B.\u0130.E.B.E.R-Believe In Everything Because Everything\'s Reachable. #NEVERSAYNEVER","default_profile":false,"notifications":null,"time_zone":"Paris","followers_count":14506,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"BieberEagle","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ffffff","location":"\u2665 Albania \u2665 ","id_str":"229067923","profile_background_image_url":"http:\/\/a2.twimg.com\/profile_background_images\/298443445\/355584171.jpg","favourites_count":89,"protected":false,"follow_request_sent":null,"following":null,"name":"truebelieber","statuses_count":24279,"verified":false,"created_at":"Tue Dec 21 12:35:18 +0000 2010","profile_text_color":"000000","id":229067923,"contributors_enabled":false,"utc_offset":3600,"profile_sidebar_fill_color":""},"id":96638597737889792,"created_at":"Thu Jul 28 17:50:11 +0000 2011","geo":null,"in_reply_to_screen_name":null},"retweet_count":"100+","source":"web","entities":{"user_mentions":[{"indices":[3,15],"screen_name":"BieberEagle","id_str":"229067923","name":"truebelieber","id":229067923}],"hashtags":[],"urls":[]},"truncated":false,"place":null,"id_str":"96965037637382145","in_reply_to_user_id":null,"in_reply_to_status_id":null,"favorited":false,"in_reply_to_status_id_str":null,"user":{"is_translator":false,"profile_background_tile":true,"profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","listed_count":3,"friends_count":1150,"profile_link_color":"00cccc","profile_sidebar_border_color":"c8ff00","url":"http:\/\/www.facebook.com\/blovedbecca180","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","profile_image_url":"http:\/\/a3.twimg.com\/profile_images\/1466752962\/block_party_7.27.11_015_normal.JPG","description":"if ya wanna know something about me, then get to know me. \\n\\r\\n\\ri promise, you wont regret it. (:\\r\\ni love justin bieber with an extreme burning passion!","default_profile":false,"notifications":null,"time_zone":"Central Time (US & Canada)","followers_count":361,"default_profile_image":false,"lang":"en","profile_use_background_image":true,"screen_name":"beccaxannxx","show_all_inline_media":false,"geo_enabled":false,"profile_background_color":"ff0066","location":"","id_str":"65624607","profile_background_image_url":"http:\/\/a3.twimg.com\/profile_background_images\/300419382\/ipod.7.14_054.JPG","favourites_count":266,"protected":false,"follow_request_sent":null,"following":null,"name":"beccaxannxx","statuses_count":2512,"verified":false,"created_at":"Fri Aug 14 12:36:35 +0000 2009","profile_text_color":"6a39d4","id":65624607,"contributors_enabled":false,"utc_offset":-21600,"profile_sidebar_fill_color":"ff00bb"},"id":96965037637382145,"created_at":"Fri Jul 29 15:27:21 +0000 2011","geo":null,"in_reply_to_screen_name":null}'
-
+original_json_media = u'{"user": {"follow_request_sent": null, "profile_use_background_image": true, "id": 34311537, "verified": false, "profile_image_url_https": "https://si0.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "profile_sidebar_fill_color": "DAECF4", "is_translator": false, "geo_enabled": false, "profile_text_color": "663B12", "followers_count": 29, "profile_sidebar_border_color": "C6E2EE", "id_str": "34311537", "default_profile_image": false, "location": "", "utc_offset": -25200, "statuses_count": 813, "description": "i like joe jonas, justin bieber, ashley tisdale, selena gomez, megen fox, kim kardashian and demi lovoto and many more.", "friends_count": 101, "profile_link_color": "1F98C7", "profile_image_url": "http://a1.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "notifications": null, "show_all_inline_media": false, "profile_background_image_url_https": "https://si0.twimg.com/profile_background_images/298244445/tour2011.jpg", "profile_background_color": "C6E2EE", "profile_background_image_url": "http://a1.twimg.com/profile_background_images/298244445/tour2011.jpg", "name": "mikayla", "lang": "en", "profile_background_tile": true, "favourites_count": 231, "screen_name": "bieberfever17ya", "url": null, "created_at": "Wed Apr 22 16:04:28 +0000 2009", "contributors_enabled": false, "time_zone": "Mountain Time (US & Canada)", "protected": false, "default_profile": false, "following": null, "listed_count": 1}, "favorited": false, "entities": {"user_mentions": [], "media": [{"media_url_https": "https://p.twimg.com/AWea5Z-CQAAvfvK.jpg", "expanded_url": "http://twitter.com/bieberfever17ya/status/101219827649232896/photo/1", "sizes": {"small": {"h": 240, "w": 201, "resize": "fit"}, "large": {"h": 240, "w": 201, "resize": "fit"}, "medium": {"h": 240, "w": 201, "resize": "fit"}, "thumb": {"h": 150, "w": 150, "resize": "crop"}}, "url": "http://t.co/N7yZ8hS", "display_url": "pic.twitter.com/N7yZ8hS", "id_str": "101219827653427200", "indices": [31, 50], "type": "photo", "id": 101219827653427200, "media_url": "http://p.twimg.com/AWea5Z-CQAAvfvK.jpg"}], "hashtags": [], "urls": []}, "contributors": null, "truncated": false, "text": "i love you justin bieber <3 http://t.co/N7yZ8hS", "created_at": "Wed Aug 10 09:14:22 +0000 2011", "retweeted": false, "in_reply_to_status_id": null, "coordinates": null, "id": 101219827649232896, "source": "web", "in_reply_to_status_id_str": null, "place": null, "in_reply_to_user_id": null, "in_reply_to_screen_name": null, "retweet_count": 0, "geo": null, "in_reply_to_user_id_str": null, "possibly_sensitive": false, "id_str": "101219827649232896"}'
+original_json_media_others = u'{"user": {"utc_offset": -25200, "statuses_count": 813, "default_profile_image": false, "friends_count": 101, "profile_background_image_url_https": "https://si0.twimg.com/profile_background_images/298244445/tour2011.jpg", "profile_use_background_image": true, "profile_sidebar_fill_color": "DAECF4", "profile_link_color": "1F98C7", "profile_image_url": "http://a1.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "time_zone": "Mountain Time (US & Canada)", "is_translator": false, "screen_name": "bieberfever17ya", "url": null, "show_all_inline_media": false, "geo_enabled": false, "profile_background_color": "C6E2EE", "id": 34311537, "profile_background_image_url": "http://a1.twimg.com/profile_background_images/298244445/tour2011.jpg", "description": "i like joe jonas, justin bieber, ashley tisdale, selena gomez, megen fox, kim kardashian and demi lovoto and many more.", "lang": "en", "profile_background_tile": true, "favourites_count": 231, "name": "mikayla", "notifications": null, "follow_request_sent": null, "created_at": "Wed Apr 22 16:04:28 +0000 2009", "verified": false, "contributors_enabled": false, "location": "", "profile_text_color": "663B12", "followers_count": 29, "profile_sidebar_border_color": "C6E2EE", "id_str": "34311537", "default_profile": false, "following": null, "protected": false, "profile_image_url_https": "https://si0.twimg.com/profile_images/1452959211/63440_1494505009634_1444332638_31074099_4058882_n_normal.jpg", "listed_count": 1}, "favorited": false, "contributors": null, "source": "web", "text": "i love you justin bieber <3 http://t.co/N7yZ8hS", "created_at": "Wed Aug 10 09:14:22 +0000 2011", "truncated": false, "retweeted": false, "in_reply_to_status_id_str": null, "coordinates": null, "in_reply_to_user_id_str": null, "entities": {"user_mentions": [], "media": [], "hashtags": [], "urls": [], "others": [{"url": "http://t.co/N7yZ8hS", "text": "comments", "indices": [31, 50]}]}, "in_reply_to_status_id": null, "in_reply_to_screen_name": null, "id_str": "101219827649232896", "place": null, "retweet_count": 0, "geo": null, "id": 101219827649232896, "possibly_sensitive": false, "in_reply_to_user_id": null}'
class TestTwitterProcessor(unittest.TestCase):
@@ -136,7 +137,46 @@
self.assertEqual(u"beccaxannxx",tweet.user.name)
self.assertEqual(65624607,tweet.user.id)
self.assertEqual(1,len(tweet.entity_list))
- self.assertEqual(u"BieberEagle", tweet.entity_list[0].user.screen_name)
+ entity = tweet.entity_list[0]
+ self.assertEqual(u"BieberEagle", entity.user.screen_name)
+ self.assertTrue(entity.user.created_at is None)
+ self.assertEqual("entity_user", entity.type)
+ self.assertEqual("user_mentions", entity.entity_type.label)
+
+
+ def testTwitterProcessorMedia(self):
+ tp = TwitterProcessor(None, original_json_media, None, self.session, self.tmpfilepath)
+ tp.process()
+ self.session.commit()
+
+ self.assertEquals(1, self.session.query(models.TweetSource).count())
+ self.assertEquals(1, self.session.query(models.Tweet).count())
+ self.assertEquals(1, self.session.query(models.User).count())
+ tweet = self.session.query(models.Tweet).first()
+ self.assertFalse(tweet.user is None)
+ self.assertEqual(u"mikayla",tweet.user.name)
+ self.assertEqual(34311537,tweet.user.id)
+ self.assertEqual(1,len(tweet.entity_list))
+ entity = tweet.entity_list[0]
+ self.assertEqual(101219827653427200, entity.media.id)
+ self.assertEqual("photo", entity.media.type.label)
+ self.assertEqual("entity_media", entity.type)
+ self.assertEqual("media", entity.entity_type.label)
+
+
+ def testTwitterProcessorMediaOthers(self):
+ tp = TwitterProcessor(None, original_json_media_others, None, self.session, self.tmpfilepath)
+ tp.process()
+ self.session.commit()
+
+ self.assertEquals(1, self.session.query(models.TweetSource).count())
+ self.assertEquals(1, self.session.query(models.Tweet).count())
+ tweet = self.session.query(models.Tweet).first()
+ self.assertEqual(1,len(tweet.entity_list))
+ entity = tweet.entity_list[0]
+ self.assertEqual("entity_entity", entity.type)
+ self.assertEqual("others", entity.entity_type.label)
+
def tearDown(self):
--- a/script/lib/iri_tweet/utils.py Tue Aug 09 13:07:23 2011 +0200
+++ b/script/lib/iri_tweet/utils.py Fri Aug 12 18:17:27 2011 +0200
@@ -1,6 +1,7 @@
from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url,
EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY,
- ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog)
+ ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType,
+ Media, EntityMedia, Entity, EntityType)
from sqlalchemy.sql import select, or_ #@UnresolvedImport
import anyjson #@UnresolvedImport
import datetime
@@ -16,24 +17,40 @@
CACHE_ACCESS_TOKEN = {}
-def get_oauth_token(token_file_path=None, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET):
+def get_oauth_token(token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET):
global CACHE_ACCESS_TOKEN
-
- if CACHE_ACCESS_TOKEN is not None and application_name in CACHE_ACCESS_TOKEN:
- return CACHE_ACCESS_TOKEN[application_name]
-
- if token_file_path and os.path.exists(token_file_path):
- logging.debug("reading token from file %s" % token_file_path) #@UndefinedVariable
- CACHE_ACCESS_TOKEN[application_name] = twitter.oauth.read_token_file(token_file_path)
- return CACHE_ACCESS_TOKEN[application_name]
- #read access token info from path
-
- if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
+
+ if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
- CACHE_ACCESS_TOKEN[application_name] = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
- return CACHE_ACCESS_TOKEN[application_name]
+ res = CACHE_ACCESS_TOKEN.get(application_name, None)
+
+ if res is None and token_file_path and os.path.exists(token_file_path):
+ get_logger().debug("get_oauth_token : reading token from file %s" % token_file_path) #@UndefinedVariable
+ res = twitter.oauth.read_token_file(token_file_path)
+
+ if res is not None and check_access_token:
+ get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable
+ t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], CONSUMER_KEY, CONSUMER_SECRET))
+ status = None
+ try:
+ status = t.account.rate_limit_status()
+ except Exception as e:
+ get_logger().debug("get_oauth_token : error getting rate limit status %s" % repr(e))
+ status = None
+ get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable
+ if status is None or status['remaining_hits'] == 0:
+ get_logger().debug("get_oauth_token : Problem with status %s" % repr(status))
+ res = None
+
+ if res is None:
+ get_logger().debug("get_oauth_token : doing the oauth dance")
+ res = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
+
+ CACHE_ACCESS_TOKEN[application_name] = res
+
+ return res
def parse_date(date_str):
ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable
@@ -54,6 +71,13 @@
"user": {
"created_at" : adapt_date,
},
+
+ },
+
+ 'entities' : {
+ "medias": {
+ "sizes" : adapt_json,
+ },
},
'rest': {
"tweet" : {
@@ -89,7 +113,12 @@
new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {}
- self.instance = self.klass(*new_args, **new_kwargs)
+ if self.instance is None:
+ self.instance = self.klass(*new_args, **new_kwargs)
+ else:
+ self.instance = self.klass(*new_args, **new_kwargs)
+ self.instance = session.merge(self.instance)
+
session.add(self.instance)
if self.must_flush:
session.flush()
@@ -104,40 +133,45 @@
def __init__(self):
self.__bufferlist = []
+ self.__bufferdict = {}
+
+ def __add_proxy_object(self, proxy):
+ proxy_list = self.__bufferdict.get(proxy.klass, None)
+ if proxy_list is None:
+ proxy_list = []
+ self.__bufferdict[proxy.klass] = proxy_list
+ proxy_list.append(proxy)
+ self.__bufferlist.append(proxy)
def persists(self, session):
for object_proxy in self.__bufferlist:
object_proxy.persists(session)
-
- def add_object(self, klass, args, kwargs, must_flush):
- new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush)
- self.__bufferlist.append(new_proxy)
+
+ def add_object(self, klass, args, kwargs, must_flush, instance=None):
+ new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush, instance)
+ self.__add_proxy_object(new_proxy)
return new_proxy
def get(self, klass, **kwargs):
- for proxy in self.__bufferlist:
- if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass:
- continue
- found = True
- for k,v in kwargs.items():
- if (k not in proxy.kwargs) or v != proxy.kwargs[k]:
- found = False
- break
- if found:
- return proxy
-
+ if klass in self.__bufferdict:
+ for proxy in self.__bufferdict[klass]:
+ if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass:
+ continue
+ found = True
+ for k,v in kwargs.items():
+ if (k not in proxy.kwargs) or v != proxy.kwargs[k]:
+ found = False
+ break
+ if found:
+ return proxy
return None
-
-
-
-
class TwitterProcessorException(Exception):
pass
class TwitterProcessor(object):
- def __init__(self, json_dict, json_txt, source_id, session, token_filename=None):
+ def __init__(self, json_dict, json_txt, source_id, session, access_token=None, token_filename=None):
if json_dict is None and json_txt is None:
raise TwitterProcessorException("No json")
@@ -158,11 +192,13 @@
self.source_id = source_id
self.session = session
self.token_filename = token_filename
+ self.access_token = access_token
self.obj_buffer = ObjectsBuffer()
+
- def __get_user(self, user_dict):
- logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
+ def __get_user(self, user_dict, do_merge, query_twitter = False):
+ get_logger().debug("Get user : " + repr(user_dict)) #@UndefinedVariable
user_id = user_dict.get("id",None)
user_name = user_dict.get("screen_name", user_dict.get("name", None))
@@ -192,8 +228,12 @@
user_created_at = user_dict.get("created_at", None)
- if user_created_at is None:
- acess_token_key, access_token_secret = get_oauth_token(self.token_filename)
+ if user_created_at is None and query_twitter:
+
+ if self.access_token is not None:
+ acess_token_key, access_token_secret = self.access_token
+ else:
+ acess_token_key, access_token_secret = get_oauth_token(self.token_filename)
t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET))
try:
if user_id:
@@ -201,8 +241,8 @@
else:
user_dict = t.users.show(screen_name=user_name)
except Exception as e:
- logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
- logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
+ get_logger().info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
+ get_logger().info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
return None
user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
@@ -210,45 +250,79 @@
return None
#TODO filter get, wrap in proxy
- user = self.session.query(User).filter(User.id == user_dict["id"]).first()
+ user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first()
- if user is not None:
- return user
+ if user_obj is not None:
+ if not do_merge:
+ return ObjectBufferProxy(User, None, None, False, user_obj)
user = self.obj_buffer.add_object(User, None, user_dict, True)
return user
+ def __get_or_create_object(self, klass, filter_by_kwargs, filter, creation_kwargs, must_flush, do_merge):
+
+ obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs)
+ if obj_proxy is None:
+ query = self.session.query(klass)
+ if filter is not None:
+ query = query.filter(filter)
+ else:
+ query = query.filter_by(**filter_by_kwargs)
+ obj_instance = query.first()
+ if obj_instance is not None:
+ if not do_merge:
+ obj_proxy = ObjectBufferProxy(klass, None, None, False, obj_instance)
+ else:
+ obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush, obj_instance)
+ if obj_proxy is None:
+ obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush)
+ return obj_proxy
+
def __process_entity(self, ind, ind_type):
- logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
+ get_logger().debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
ind = clean_keys(ind)
+ entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False)
+
entity_dict = {
- "indice_start": ind["indices"][0],
- "indice_end" : ind["indices"][1],
- "tweet_id" : self.tweet.id,
+ "indice_start" : ind["indices"][0],
+ "indice_end" : ind["indices"][1],
+ "tweet_id" : self.tweet.id,
+ "entity_type_id" : entity_type.id,
+ "source" : adapt_json(ind)
}
-
+
+ def process_medias():
+
+ media_id = ind.get('id', None)
+ if media_id is None:
+ return None, None
+
+ type_str = ind.get("type", "photo")
+ media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False)
+ media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"])
+ if "type" in media_ind:
+ del(media_ind["type"])
+ media_ind['type_id'] = media_type.id
+ media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False)
+
+ entity_dict['media_id'] = media.id
+ return EntityMedia, entity_dict
+
def process_hashtags():
text = ind.get("text", ind.get("hashtag", None))
if text is None:
- return None
- hashtag = self.obj_buffer.get(Hashtag, text=text)
- if hashtag is None:
- hashtag_obj = self.session.query(Hashtag).filter(Hashtag.text.ilike(text)).first()
- if hashtag_obj is not None:
- hashtag = ObjectBufferProxy(Hashtag, None, None, False, hashtag_obj)
-
- if hashtag is None:
- ind["text"] = text
- hashtag = self.obj_buffer.add_object(Hashtag, None, ind, True)
+ return None, None
+ ind['text'] = text
+ hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False)
entity_dict['hashtag_id'] = hashtag.id
return EntityHashtag, entity_dict
def process_user_mentions():
- user_mention = self.__get_user(ind)
+ user_mention = self.__get_user(ind, False, False)
if user_mention is None:
entity_dict['user_id'] = None
else:
@@ -256,24 +330,19 @@
return EntityUser, entity_dict
def process_urls():
- url = self.obj_buffer.get(Url, url=ind["url"])
- if url is None:
- url_obj = self.session.query(Url).filter(Url.url == ind["url"]).first()
- if url_obj is not None:
- url = ObjectBufferProxy(Url, None, None, False, url_obj)
- if url is None:
- url = self.obj_buffer.add_object(Url, None, ind, True)
+ url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False)
entity_dict['url_id'] = url.id
return EntityUrl, entity_dict
-
+
#{'': lambda }
entity_klass, entity_dict = {
'hashtags': process_hashtags,
'user_mentions' : process_user_mentions,
- 'urls' : process_urls
- }[ind_type]()
+ 'urls' : process_urls,
+ 'media': process_medias,
+ }.get(ind_type, lambda: (Entity, entity_dict))()
- logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
+ get_logger().debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
if entity_klass:
self.obj_buffer.add_object(entity_klass, None, entity_dict, False)
@@ -287,9 +356,9 @@
ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
# get or create user
- user = self.__get_user(self.json_dict["user"])
+ user = self.__get_user(self.json_dict["user"], True)
if user is None:
- logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
+ get_logger().warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
ts_copy["user_id"] = None
else:
ts_copy["user_id"] = user.id
@@ -299,25 +368,26 @@
self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True)
- # get entities
+ self.__process_entities()
+
+
+ def __process_entities(self):
if "entities" in self.json_dict:
for ind_type, entity_list in self.json_dict["entities"].items():
for ind in entity_list:
self.__process_entity(ind, ind_type)
else:
- extractor = twitter_text.Extractor(self.tweet.text)
-
+
+ text = self.tweet.text
+ extractor = twitter_text.Extractor(text)
for ind in extractor.extract_hashtags_with_indices():
self.__process_entity(ind, "hashtags")
-
+
+ for ind in extractor.extract_urls_with_indices():
+ self.__process_entity(ind, "urls")
+
for ind in extractor.extract_mentioned_screen_names_with_indices():
self.__process_entity(ind, "user_mentions")
-
- for ind in extractor.extract_urls_with_indices():
- self.__process_entity(ind, "urls")
-
- self.session.flush()
-
def __process_twitter_rest(self):
tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
@@ -348,28 +418,17 @@
'screen_name' : self.json_dict["from_user"],
}
- user = self.__get_user(user_fields)
+ user = self.__get_user(user_fields, do_merge=False)
if user is None:
- logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
+ get_logger().warning("USER not found " + repr(user_fields)) #@UndefinedVariable
tweet_fields["user_id"] = None
else:
tweet_fields["user_id"] = user.id
tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True)
-
- text = self.tweet.text
-
- extractor = twitter_text.Extractor(text)
-
- for ind in extractor.extract_hashtags_with_indices():
- self.__process_entity(ind, "hashtags")
-
- for ind in extractor.extract_urls_with_indices():
- self.__process_entity(ind, "urls")
-
- for ind in extractor.extract_mentioned_screen_names_with_indices():
- self.__process_entity(ind, "user_mentions")
+
+ self.__process_entities()
@@ -384,7 +443,7 @@
else:
self.__process_twitter_stream()
- self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, False)
+ self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, True)
self.obj_buffer.persists(self.session)
@@ -405,7 +464,7 @@
logger = plogger
if logger is None:
- logger = logging.getLogger() #@UndefinedVariable
+ logger = get_logger() #@UndefinedVariable
if len(logger.handlers) == 0:
filename = logging_config.get("filename")
@@ -477,4 +536,5 @@
return query.distinct()
-logger = logging.getLogger() #@UndefinedVariable
+def get_logger():
+ return logging.getLogger("iri_tweet") #@UndefinedVariable
--- a/script/lib/tweetstream/tweetstream/streamclasses.py Tue Aug 09 13:07:23 2011 +0200
+++ b/script/lib/tweetstream/tweetstream/streamclasses.py Fri Aug 12 18:17:27 2011 +0200
@@ -2,8 +2,8 @@
import anyjson
import socket
import time
-import urllib
-import urllib2
+import urllib #@UnresolvedImport
+import urllib2 #@UnresolvedImport
socket._fileobject.default_bufsize = 0
--- a/script/stream/recorder_tweetstream.py Tue Aug 09 13:07:23 2011 +0200
+++ b/script/stream/recorder_tweetstream.py Fri Aug 12 18:17:27 2011 +0200
@@ -3,22 +3,25 @@
from iri_tweet.models import TweetSource, TweetLog
from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
from optparse import OptionParser
-from sqlalchemy.orm import sessionmaker
+from sqlalchemy.exc import OperationalError
+from sqlalchemy.orm import scoped_session, sessionmaker
import StringIO
-import logging
import anyjson
import datetime
+import logging
import os
+import re
import shutil
import signal
import socket
+import sqlalchemy.schema
import sys
import time
import traceback
import tweepy.auth
import tweetstream
-from iri_tweet.utils import logger
-from sqlalchemy.exc import OperationalError
+import urllib2
+#from iri_tweet.utils import get_logger
socket._fileobject.default_bufsize = 0
@@ -30,6 +33,26 @@
#just put it in a sqlite3 tqble
+def set_logging(options):
+ utils.set_logging(options, logging.getLogger('iri_tweet'))
+ utils.set_logging(options, logging.getLogger('multiprocessing'))
+ if options.debug >= 2:
+ utils.set_logging(options, logging.getLogger('sqlalchemy.engine'))
+ #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
+ #utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
+ #utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
+
+def get_auth(options, access_token):
+ if options.username and options.password:
+ auth = tweepy.auth.BasicAuthHandler(options.username, options.password)
+ else:
+ consumer_key = models.CONSUMER_KEY
+ consumer_secret = models.CONSUMER_SECRET
+ auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
+ auth.set_access_token(*access_token)
+ return auth
+
+
class ReconnectingTweetStream(tweetstream.FilterStream):
"""TweetStream class that automatically tries to reconnect if the
connecting goes down. Reconnecting, and waiting for reconnecting, is
@@ -62,9 +85,10 @@
def next(self):
while True:
try:
+ utils.get_logger().debug("return super.next")
return super(ReconnectingTweetStream, self).next()
except tweetstream.ConnectionError, e:
- logging.debug("connection error :" + str(e))
+ utils.get_logger().debug("connection error :" + str(e))
self._reconnects += 1
if self._reconnects > self.max_reconnects:
raise tweetstream.ConnectionError("Too many retries")
@@ -80,38 +104,43 @@
+
class SourceProcess(Process):
- def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
+ def __init__(self, session_maker, queue, options, access_token, stop_event):
self.session_maker = session_maker
self.queue = queue
- self.auth = auth
- self.track = track
- self.debug = debug
- self.reconnects = reconnects
- self.token_filename = token_filename
+ self.track = options.track
+ self.reconnects = options.reconnects
+ self.token_filename = options.token_filename
self.stop_event = stop_event
+ self.options = options
+ self.access_token = access_token
super(SourceProcess, self).__init__()
-# self.stop_event =
def run(self):
+ #import pydevd
+ #pydevd.settrace(suspend=False)
+
+ set_logging(self.options)
+ self.auth = get_auth(self.options, self.access_token)
- get_logger().debug("SourceProcess : run")
+ utils.get_logger().debug("SourceProcess : run")
track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
track_list = [k for k in track_list.split(',')]
- get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))
+ utils.get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))
stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
- get_logger().debug("SourceProcess : after connecting to stream")
+ utils.get_logger().debug("SourceProcess : after connecting to stream")
stream.muststop = lambda: self.stop_event.is_set()
session = self.session_maker()
try:
for tweet in stream:
- get_logger().debug("tweet " + repr(tweet))
+ utils.get_logger().debug("tweet " + repr(tweet))
source = TweetSource(original_json=tweet)
- get_logger().debug("source created")
+ utils.get_logger().debug("source created")
add_retries = 0
while add_retries < 10:
try:
@@ -121,21 +150,18 @@
break
except OperationalError as e:
session.rollback()
- get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
- if add_retries==10:
+ utils.get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
+ if add_retries == 10:
raise e
source_id = source.id
- get_logger().debug("before queue + source id " + repr(source_id))
- self.queue.put((source_id, tweet), False)
- #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
- get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+ utils.get_logger().debug("before queue + source id " + repr(source_id))
+ utils.get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
session.commit()
-# if duration >= 0 and datetime.datetime.utcnow() >= end_ts:
-# print "Stop recording after %d seconds." % (duration)
-# break
+ self.queue.put((source_id, tweet), False)
+
except Exception as e:
- get_logger().error("Error when processing tweet " + repr(e))
+ utils.get_logger().error("Error when processing tweet " + repr(e))
finally:
session.rollback()
stream.close()
@@ -144,19 +170,19 @@
self.stop_event.set()
-def process_tweet(tweet, source_id, session, token_filename):
+def process_tweet(tweet, source_id, session, access_token):
try:
tweet_obj = anyjson.deserialize(tweet)
screen_name = ""
if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
screen_name = tweet_obj['user']['screen_name']
- get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
- get_logger().debug(u"Process_tweet :" + repr(tweet))
- processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename)
+ utils.get_logger().debug(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
+ utils.get_logger().debug(u"Process_tweet :" + repr(tweet))
+ processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None)
processor.process()
except Exception as e:
message = u"Error %s processing tweet %s" % (repr(e), tweet)
- get_logger().error(message)
+ utils.get_logger().error(message)
output = StringIO.StringIO()
traceback.print_exception(Exception, e, None, None, output)
error_stack = output.getvalue()
@@ -170,42 +196,49 @@
class TweetProcess(Process):
- def __init__(self, session_maker, queue, debug, token_filename, stop_event):
+ def __init__(self, session_maker, queue, options, access_token, stop_event):
self.session_maker = session_maker
self.queue = queue
- self.debug = debug
self.stop_event = stop_event
- self.token_filename = token_filename
+ self.options = options
+ self.access_token = access_token
super(TweetProcess, self).__init__()
def run(self):
+ set_logging(self.options)
session = self.session_maker()
try:
while not self.stop_event.is_set():
try:
- source_id, tweet_txt = queue.get(True, 10)
- get_logger().debug("Processing source id " + repr(source_id))
+ source_id, tweet_txt = queue.get(True, 3)
+ utils.get_logger().debug("Processing source id " + repr(source_id))
except Exception as e:
- get_logger().debug('Process tweet exception in loop : ' + repr(e))
+ utils.get_logger().debug('Process tweet exception in loop : ' + repr(e))
continue
- process_tweet(tweet_txt, source_id, session, self.token_filename)
+ process_tweet(tweet_txt, source_id, session, self.access_token)
session.commit()
- except:
- raise
finally:
session.rollback()
self.stop_event.set()
session.close()
+
+
+def get_sessionmaker(conn_str):
+ engine, metadata = models.setup_database(conn_str, echo=False, create_all=False)
+ Session = scoped_session(sessionmaker(bind=engine, autocommit=False))
+ return Session, engine, metadata
+
-def process_leftovers(session, token_filename):
+def process_leftovers(session, access_token):
sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
for src in sources:
tweet_txt = src.original_json
- process_tweet(tweet_txt, src.id, session, token_filename)
+ process_tweet(tweet_txt, src.id, session, access_token)
+ session.commit()
@@ -215,8 +248,8 @@
def get_options():
parser = OptionParser()
- parser.add_option("-f", "--file", dest="filename",
- help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
+ parser.add_option("-f", "--file", dest="conn_str",
+ help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
parser.add_option("-u", "--user", dest="username",
help="Twitter user", metavar="USER", default=None)
parser.add_option("-w", "--password", dest="password",
@@ -231,8 +264,8 @@
help="Token file name")
parser.add_option("-d", "--duration", dest="duration",
help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
- parser.add_option("-N", "--consumer", dest="consumer_nb",
- help="number of consumer", metavar="CONSUMER", default=1, type='int')
+ parser.add_option("-N", "--nb-process", dest="process_nb",
+ help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
@@ -245,53 +278,81 @@
(options, args) = get_options()
- utils.set_logging(options, get_logger())
+ set_logging(options)
if options.debug:
print "OPTIONS : "
print repr(options)
- if options.new and os.path.exists(options.filename):
- i = 1
- basename, extension = os.path.splitext(options.filename)
- new_path = '%s.%d%s' % (basename, i, extension)
- while i < 1000000 and os.path.exists(new_path):
- i += 1
+
+ conn_str = options.conn_str.strip()
+ if not re.match("^\w+://.+", conn_str):
+ conn_str = 'sqlite://' + options.conn_str
+
+ if conn_str.startswith("sqlite") and options.new:
+ filepath = conn_str[conn_str.find("://"):]
+ if os.path.exists(filepath):
+ i = 1
+ basename, extension = os.path.splitext(filepath)
new_path = '%s.%d%s' % (basename, i, extension)
- if i >= 1000000:
- raise Exception("Unable to find new filename for " + options.filename)
- else:
- shutil.move(options.filename, new_path)
+ while i < 1000000 and os.path.exists(new_path):
+ i += 1
+ new_path = '%s.%d%s' % (basename, i, extension)
+ if i >= 1000000:
+ raise Exception("Unable to find new filename for " + filepath)
+ else:
+ shutil.move(filepath, new_path)
+ Session, engine, metadata = get_sessionmaker(conn_str)
+ if options.new:
+ check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
+ if len(check_metadata.sorted_tables) > 0:
+ message = "Database %s not empty exiting" % conn_str
+ utils.get_logger().error(message)
+ sys.exit(message)
+
+ metadata.create_all(engine)
+
+ access_token = None
+ if not options.username or not options.password:
+ access_token = utils.get_oauth_token(options.token_filename)
+
+ session = Session()
+ try:
+ process_leftovers(session, access_token)
+ session.commit()
+ finally:
+ session.rollback()
+ session.close()
+
+ if options.process_nb <= 0:
+ utils.get_logger().debug("Leftovers processed. Exiting.")
+ sys.exit()
+
queue = JoinableQueue()
stop_event = Event()
-
- if options.username and options.password:
- auth = tweepy.auth.BasicAuthHandler(options.username, options.password)
- else:
- consumer_key = models.CONSUMER_KEY
- consumer_secret = models.CONSUMER_SECRET
- auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
- auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))
-
-
- engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
- Session = sessionmaker(bind=engine)
- session = Session()
- process_leftovers(session, options.token_filename)
- session.commit()
- session.close()
-
- sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)
+ #workaround for bug on using urllib2 and multiprocessing
+ req = urllib2.Request('http://localhost')
+ conn = None
+ try:
+ conn = urllib2.urlopen(req)
+ except:
+ pass
+ #donothing
+ finally:
+ if conn is not None:
+ conn.close()
+
+
+ sprocess = SourceProcess(Session, queue, options, access_token, stop_event)
tweet_processes = []
- for i in range(options.consumer_nb):
- engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
- Session = sessionmaker(bind=engine)
- cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
+ for i in range(options.process_nb - 1):
+ Session, engine, metadata = get_sessionmaker(conn_str)
+ cprocess = TweetProcess(Session, queue, options, access_token, stop_event)
tweet_processes.append(cprocess)
def interupt_handler(signum, frame):
@@ -311,23 +372,36 @@
stop_event.set()
break
if sprocess.is_alive():
- time.sleep(0.1)
+ time.sleep(1)
else:
+ stop_event.set()
break
- get_logger().debug("Joining Source Process")
- sprocess.join()
- get_logger().debug("Joining Queue")
+ utils.get_logger().debug("Joining Source Process")
+ try:
+ sprocess.join(10)
+ except:
+ utils.get_logger().debug("Pb joining Source Process - terminating")
+ sprocess.terminate()
+
+ utils.get_logger().debug("Joining Queue")
#queue.join()
- for i,cprocess in enumerate(tweet_processes):
- get_logger().debug("Joining consumer process Nb %d" % (i+1))
- cprocess.join()
+ for i, cprocess in enumerate(tweet_processes):
+ utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
+ try:
+ cprocess.join(3)
+ except:
+ utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
+ cprocess.terminate()
- get_logger().debug("Processing leftovers")
+ utils.get_logger().debug("Processing leftovers")
session = Session()
- process_leftovers(session, options.token_filename)
- session.commit()
- session.close()
+ try:
+ process_leftovers(session, access_token)
+ session.commit()
+ finally:
+ session.rollback()
+ session.close()
- get_logger().debug("Done. Exiting.")
-
\ No newline at end of file
+ utils.get_logger().debug("Done. Exiting.")
+