--- a/script/lib/iri_tweet/utils.py Mon Aug 08 09:01:40 2011 +0200
+++ b/script/lib/iri_tweet/utils.py Tue Aug 09 12:40:39 2011 +0200
@@ -78,12 +78,12 @@
class ObjectBufferProxy(object):
- def __init__(self, klass, args, kwargs, must_flush):
+ def __init__(self, klass, args, kwargs, must_flush, instance=None):
self.klass= klass
self.args = args
self.kwargs = kwargs
self.must_flush = must_flush
- self.instance = None
+ self.instance = instance
def persists(self, session):
new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
@@ -114,6 +114,21 @@
self.__bufferlist.append(new_proxy)
return new_proxy
+ def get(self, klass, **kwargs):
+ for proxy in self.__bufferlist:
+ if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass:
+ continue
+ found = True
+ for k,v in kwargs.items():
+ if (k not in proxy.kwargs) or v != proxy.kwargs[k]:
+ found = False
+ break
+ if found:
+ return proxy
+
+ return None
+
+
@@ -145,6 +160,7 @@
self.token_filename = token_filename
self.obj_buffer = ObjectsBuffer()
+
def __get_user(self, user_dict):
logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
@@ -153,13 +169,25 @@
if user_id is None and user_name is None:
return None
-
+
+ user = None
if user_id:
- user = self.session.query(User).filter(User.id == user_id).first()
+ user = self.obj_buffer.get(User, id=user_id)
else:
- user = self.session.query(User).filter(User.screen_name == user_name).first()
+ user = self.obj_buffer.get(User, screen_name=user_name)
+
+ if user is not None:
+ return user
+
+ #todo : add methpds to objectbuffer to get buffer user
+ user_obj = None
+ if user_id:
+ user_obj = self.session.query(User).filter(User.id == user_id).first()
+ else:
+ user_obj = self.session.query(User).filter(User.screen_name == user_name).first()
- if user is not None:
+ if user_obj is not None:
+ user = ObjectBufferProxy(User, None, None, False, user_obj)
return user
user_created_at = user_dict.get("created_at", None)
@@ -180,12 +208,10 @@
if "id" not in user_dict:
return None
- user = User(**user_dict)
+ user = self.obj_buffer.add_object(User, None, user_dict, True)
- self.session.add(user)
- self.session.flush()
-
- return user
+ return user
+
def __process_entity(self, ind, ind_type):
logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
@@ -196,57 +222,53 @@
"indice_start": ind["indices"][0],
"indice_end" : ind["indices"][1],
"tweet_id" : self.tweet.id,
- "tweet" : self.tweet
}
def process_hashtags():
text = ind.get("text", ind.get("hashtag", None))
if text is None:
- return None
- hashtag = self.session.query(Hashtag).filter(Hashtag.text == text).first()
+ return None
+ hashtag = self.obj_buffer.get(Hashtag, text=text)
+ if hashtag is None:
+ hashtag_obj = self.session.query(Hashtag).filter(Hashtag.text == text).first()
+ if hashtag_obj is not None:
+ hashtag = ObjectBufferProxy(Hashtag, None, None, False, hashtag_obj)
+
if not hashtag:
ind["text"] = text
- hashtag = Hashtag(**ind)
- self.session.add(hashtag)
- self.session.flush()
- entity_dict['hashtag'] = hashtag
+ hashtag = self.obj_buffer.add_object(Hashtag, None, ind, True)
entity_dict['hashtag_id'] = hashtag.id
- entity = EntityHashtag(**entity_dict)
- return entity
+ return EntityHashtag, entity_dict
def process_user_mentions():
user_mention = self.__get_user(ind)
if user_mention is None:
- entity_dict['user'] = None
entity_dict['user_id'] = None
else:
- entity_dict['user'] = user_mention
entity_dict['user_id'] = user_mention.id
- entity = EntityUser(**entity_dict)
- return entity
+ return EntityUser, entity_dict
def process_urls():
- url = self.session.query(Url).filter(Url.url == ind["url"]).first()
+ url = self.obj_buffer.get(Url, url=ind["url"])
if url is None:
- url = Url(**ind)
- self.session.add(url)
- self.session.flush()
- entity_dict['url'] = url
+ url_obj = self.session.query(Url).filter(Url.url == ind["url"]).first()
+ if url_obj is not None:
+ url = ObjectBufferProxy(Url, None, None, False, url_obj)
+ if url is None:
+ url = self.obj_buffer.add_object(Url, None, ind, True)
entity_dict['url_id'] = url.id
- entity = EntityUrl(**entity_dict)
- return entity
+ return EntityUrl, entity_dict
#{'': lambda }
- entity = {
+ entity_klass, entity_dict = {
'hashtags': process_hashtags,
'user_mentions' : process_user_mentions,
'urls' : process_urls
}[ind_type]()
logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
- if entity:
- self.session.add(entity)
- self.session.flush()
+ if entity_klass:
+ self.obj_buffer.add_object(entity_klass, None, entity_dict, False)
def __process_twitter_stream(self):
@@ -261,17 +283,14 @@
user = self.__get_user(self.json_dict["user"])
if user is None:
logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
- ts_copy["user"] = None
ts_copy["user_id"] = None
else:
- ts_copy["user"] = user
- ts_copy["user_id"] = ts_copy["user"].id
+ ts_copy["user_id"] = user.id
+ del(ts_copy['user'])
ts_copy["tweet_source_id"] = self.source_id
- self.tweet = Tweet(**ts_copy)
- self.session.add(self.tweet)
- self.session.flush()
+ self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True)
# get entities
for ind_type, entity_list in self.json_dict["entities"].items():
@@ -314,12 +333,10 @@
tweet_fields["user"] = None
tweet_fields["user_id"] = None
else:
- tweet_fields["user"] = user
tweet_fields["user_id"] = user.id
tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
- self.tweet = Tweet(**tweet_fields)
- self.session.add(self.tweet)
+ self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True)
text = self.tweet.text
@@ -339,9 +356,7 @@
def process(self):
if self.source_id is None:
- tweet_source = TweetSource(original_json=self.json_txt);
- self.session.add(tweet_source)
- self.session.flush()
+ tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True)
self.source_id = tweet_source.id
if "metadata" in self.json_dict:
@@ -349,9 +364,10 @@
else:
self.__process_twitter_stream()
- tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK'])
- self.session.add(tweet_log)
+ self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, False)
+ self.obj_buffer.persists(self.session)
+
def set_logging(options, plogger=None):