diff -r 2ebf22c65168 -r e9335ee3cf71 script/lib/iri_tweet/utils.py --- a/script/lib/iri_tweet/utils.py Wed Jul 27 18:32:56 2011 +0200 +++ b/script/lib/iri_tweet/utils.py Tue Aug 09 13:07:23 2011 +0200 @@ -1,6 +1,6 @@ -from models import Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, \ - EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, \ - ACCESS_TOKEN_SECRET, adapt_date, adapt_json +from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, + EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, + ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog) from sqlalchemy.sql import select, or_ #@UnresolvedImport import anyjson #@UnresolvedImport import datetime @@ -77,13 +77,67 @@ return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()]) +class ObjectBufferProxy(object): + def __init__(self, klass, args, kwargs, must_flush, instance=None): + self.klass= klass + self.args = args + self.kwargs = kwargs + self.must_flush = must_flush + self.instance = instance + + def persists(self, session): + new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else [] + new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {} + + self.instance = self.klass(*new_args, **new_kwargs) + session.add(self.instance) + if self.must_flush: + session.flush() + + def __getattr__(self, name): + return lambda : getattr(self.instance, name) if self.instance else None + + + + +class ObjectsBuffer(object): + + def __init__(self): + self.__bufferlist = [] + + def persists(self, session): + for object_proxy in self.__bufferlist: + object_proxy.persists(session) + + def add_object(self, klass, args, kwargs, must_flush): + new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush) + self.__bufferlist.append(new_proxy) + return new_proxy + + def get(self, klass, **kwargs): + for proxy in self.__bufferlist: + if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass: + continue + found = True + for k,v in kwargs.items(): + if (k not in proxy.kwargs) or v != proxy.kwargs[k]: + found = False + break + if found: + return proxy + + return None + + + + class TwitterProcessorException(Exception): pass class TwitterProcessor(object): - def __init__(self, json_dict, json_txt, session, token_filename=None): + def __init__(self, json_dict, json_txt, source_id, session, token_filename=None): if json_dict is None and json_txt is None: raise TwitterProcessorException("No json") @@ -101,24 +155,39 @@ if "id" not in self.json_dict: raise TwitterProcessorException("No id in json") + self.source_id = source_id self.session = session self.token_filename = token_filename + self.obj_buffer = ObjectsBuffer() + def __get_user(self, user_dict): - logging.debug("Get user : " + repr(user_dict)) #@UndefinedVariable + logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable user_id = user_dict.get("id",None) user_name = user_dict.get("screen_name", user_dict.get("name", None)) if user_id is None and user_name is None: return None - + + user = None if user_id: - user = self.session.query(User).filter(User.id == user_id).first() + user = self.obj_buffer.get(User, id=user_id) else: - user = self.session.query(User).filter(User.screen_name.ilike(user_name)).first() + user = self.obj_buffer.get(User, screen_name=user_name) + + if user is not None: + return user + + #todo : add methpds to objectbuffer to get buffer user + user_obj = None + if user_id: + user_obj = self.session.query(User).filter(User.id == user_id).first() + else: + user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first() - if user is not None: + if user_obj is not None: + user = ObjectBufferProxy(User, None, None, False, user_obj) return user user_created_at = user_dict.get("created_at", None) @@ -132,28 +201,27 @@ else: user_dict = t.users.show(screen_name=user_name) except Exception as e: - logging.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable - logging.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable + logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable + logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable return None user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"]) if "id" not in user_dict: return None + #TODO filter get, wrap in proxy user = self.session.query(User).filter(User.id == user_dict["id"]).first() if user is not None: return user - user = User(**user_dict) + user = self.obj_buffer.add_object(User, None, user_dict, True) - self.session.add(user) - self.session.flush() - - return user + return user + def __process_entity(self, ind, ind_type): - logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable + logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable ind = clean_keys(ind) @@ -161,57 +229,53 @@ "indice_start": ind["indices"][0], "indice_end" : ind["indices"][1], "tweet_id" : self.tweet.id, - "tweet" : self.tweet } def process_hashtags(): text = ind.get("text", ind.get("hashtag", None)) if text is None: - return None - hashtag = self.session.query(Hashtag).filter(Hashtag.text.ilike(text)).first() + return None + hashtag = self.obj_buffer.get(Hashtag, text=text) + if hashtag is None: + hashtag_obj = self.session.query(Hashtag).filter(Hashtag.text.ilike(text)).first() + if hashtag_obj is not None: + hashtag = ObjectBufferProxy(Hashtag, None, None, False, hashtag_obj) + if hashtag is None: ind["text"] = text - hashtag = Hashtag(**ind) - self.session.add(hashtag) - self.session.flush() - entity_dict['hashtag'] = hashtag + hashtag = self.obj_buffer.add_object(Hashtag, None, ind, True) entity_dict['hashtag_id'] = hashtag.id - entity = EntityHashtag(**entity_dict) - return entity + return EntityHashtag, entity_dict def process_user_mentions(): user_mention = self.__get_user(ind) if user_mention is None: - entity_dict['user'] = None entity_dict['user_id'] = None else: - entity_dict['user'] = user_mention entity_dict['user_id'] = user_mention.id - entity = EntityUser(**entity_dict) - return entity + return EntityUser, entity_dict def process_urls(): - url = self.session.query(Url).filter(Url.url == ind["url"]).first() + url = self.obj_buffer.get(Url, url=ind["url"]) if url is None: - url = Url(**ind) - self.session.add(url) - self.session.flush() - entity_dict['url'] = url + url_obj = self.session.query(Url).filter(Url.url == ind["url"]).first() + if url_obj is not None: + url = ObjectBufferProxy(Url, None, None, False, url_obj) + if url is None: + url = self.obj_buffer.add_object(Url, None, ind, True) entity_dict['url_id'] = url.id - entity = EntityUrl(**entity_dict) - return entity + return EntityUrl, entity_dict #{'': lambda } - entity = { + entity_klass, entity_dict = { 'hashtags': process_hashtags, 'user_mentions' : process_user_mentions, 'urls' : process_urls }[ind_type]() - logging.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable - if entity: - self.session.add(entity) - self.session.flush() + logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable + if entity_klass: + self.obj_buffer.add_object(entity_klass, None, entity_dict, False) def __process_twitter_stream(self): @@ -225,16 +289,15 @@ # get or create user user = self.__get_user(self.json_dict["user"]) if user is None: - logging.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable - ts_copy["user"] = None + logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable ts_copy["user_id"] = None else: - ts_copy["user"] = user - ts_copy["user_id"] = ts_copy["user"].id - ts_copy["original_json"] = self.json_txt + ts_copy["user_id"] = user.id + + del(ts_copy['user']) + ts_copy["tweet_source_id"] = self.source_id - self.tweet = Tweet(**ts_copy) - self.session.add(self.tweet) + self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True) # get entities if "entities" in self.json_dict: @@ -260,7 +323,8 @@ tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count() if tweet_nb > 0: return - + + tweet_fields = { 'created_at': self.json_dict["created_at"], 'favorited': False, @@ -272,8 +336,8 @@ #'place': ts["place"], 'source': self.json_dict["source"], 'text': self.json_dict["text"], - 'truncated': False, - 'original_json' : self.json_txt, + 'truncated': False, + 'tweet_source_id' : self.source_id, } #user @@ -286,16 +350,13 @@ user = self.__get_user(user_fields) if user is None: - logging.warning("USER not found " + repr(user_fields)) #@UndefinedVariable - tweet_fields["user"] = None + logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable tweet_fields["user_id"] = None else: - tweet_fields["user"] = user tweet_fields["user_id"] = user.id tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"]) - self.tweet = Tweet(**tweet_fields) - self.session.add(self.tweet) + self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True) text = self.tweet.text @@ -303,26 +364,37 @@ for ind in extractor.extract_hashtags_with_indices(): self.__process_entity(ind, "hashtags") - - for ind in extractor.extract_mentioned_screen_names_with_indices(): - self.__process_entity(ind, "user_mentions") - + for ind in extractor.extract_urls_with_indices(): self.__process_entity(ind, "urls") - self.session.flush() + for ind in extractor.extract_mentioned_screen_names_with_indices(): + self.__process_entity(ind, "user_mentions") + def process(self): + + if self.source_id is None: + tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True) + self.source_id = tweet_source.id + if "metadata" in self.json_dict: self.__process_twitter_rest() else: self.__process_twitter_stream() + + self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, False) + self.obj_buffer.persists(self.session) + -def set_logging(options): +def set_logging(options, plogger=None): - logging_config = {} + logging_config = { + "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s', + "level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable + } if options.logfile == "stdout": logging_config["stream"] = sys.stdout @@ -330,9 +402,27 @@ logging_config["stream"] = sys.stderr else: logging_config["filename"] = options.logfile - - logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) #@UndefinedVariable - logging.basicConfig(**logging_config) #@UndefinedVariable + + logger = plogger + if logger is None: + logger = logging.getLogger() #@UndefinedVariable + + if len(logger.handlers) == 0: + filename = logging_config.get("filename") + if filename: + mode = logging_config.get("filemode", 'a') + hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable + else: + stream = logging_config.get("stream") + hdlr = logging.StreamHandler(stream) #@UndefinedVariable + fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable + dfs = logging_config.get("datefmt", None) + fmt = logging.Formatter(fs, dfs) #@UndefinedVariable + hdlr.setFormatter(fmt) + logger.addHandler(hdlr) + level = logging_config.get("level") + if level is not None: + logger.setLevel(level) options.debug = (options.verbose-options.quiet > 0) @@ -387,4 +477,4 @@ return query.distinct() - +logger = logging.getLogger() #@UndefinedVariable