script/lib/iri_tweet/utils.py
changeset 244 d4b7d6e2633f
parent 243 9213a63fa34a
child 253 e9335ee3cf71
equal deleted inserted replaced
243:9213a63fa34a 244:d4b7d6e2633f
    76             return value
    76             return value
    77     return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])    
    77     return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])    
    78 
    78 
    79 
    79 
    80 class ObjectBufferProxy(object):
    80 class ObjectBufferProxy(object):
    81     def __init__(self, klass, args, kwargs, must_flush):
    81     def __init__(self, klass, args, kwargs, must_flush, instance=None):
    82         self.klass= klass
    82         self.klass= klass
    83         self.args = args
    83         self.args = args
    84         self.kwargs = kwargs
    84         self.kwargs = kwargs
    85         self.must_flush = must_flush
    85         self.must_flush = must_flush
    86         self.instance = None
    86         self.instance = instance
    87         
    87         
    88     def persists(self, session):
    88     def persists(self, session):
    89         new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
    89         new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
    90         new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {}
    90         new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {}
    91         
    91         
   112     def add_object(self, klass, args, kwargs, must_flush):
   112     def add_object(self, klass, args, kwargs, must_flush):
   113         new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush)
   113         new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush)
   114         self.__bufferlist.append(new_proxy)
   114         self.__bufferlist.append(new_proxy)
   115         return new_proxy 
   115         return new_proxy 
   116     
   116     
       
   117     def get(self, klass, **kwargs):
       
   118         for proxy in self.__bufferlist:
       
   119             if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass:
       
   120                 continue
       
   121             found = True
       
   122             for k,v in kwargs.items():
       
   123                 if (k not in proxy.kwargs) or v != proxy.kwargs[k]:
       
   124                     found = False
       
   125                     break
       
   126             if found:
       
   127                 return proxy
       
   128         
       
   129         return None
       
   130                 
       
   131                     
   117         
   132         
   118 
   133 
   119 
   134 
   120 class TwitterProcessorException(Exception):
   135 class TwitterProcessorException(Exception):
   121     pass
   136     pass
   143         self.source_id = source_id
   158         self.source_id = source_id
   144         self.session = session
   159         self.session = session
   145         self.token_filename = token_filename
   160         self.token_filename = token_filename
   146         self.obj_buffer = ObjectsBuffer()
   161         self.obj_buffer = ObjectsBuffer()
   147 
   162 
       
   163 
   148     def __get_user(self, user_dict):
   164     def __get_user(self, user_dict):
   149         logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
   165         logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
   150     
   166     
   151         user_id = user_dict.get("id",None)    
   167         user_id = user_dict.get("id",None)    
   152         user_name = user_dict.get("screen_name", user_dict.get("name", None))
   168         user_name = user_dict.get("screen_name", user_dict.get("name", None))
   153         
   169         
   154         if user_id is None and user_name is None:
   170         if user_id is None and user_name is None:
   155             return None
   171             return None
   156     
   172 
       
   173         user = None
   157         if user_id:
   174         if user_id:
   158             user = self.session.query(User).filter(User.id == user_id).first()
   175             user = self.obj_buffer.get(User, id=user_id)
   159         else:
   176         else:
   160             user = self.session.query(User).filter(User.screen_name == user_name).first()
   177             user = self.obj_buffer.get(User, screen_name=user_name)
   161     
   178             
   162         if user is not None:
   179         if user is not None:
       
   180             return user
       
   181 
       
   182         #todo : add methpds to objectbuffer to get buffer user
       
   183         user_obj = None
       
   184         if user_id:
       
   185             user_obj = self.session.query(User).filter(User.id == user_id).first()
       
   186         else:
       
   187             user_obj = self.session.query(User).filter(User.screen_name == user_name).first()
       
   188     
       
   189         if user_obj is not None:
       
   190             user = ObjectBufferProxy(User, None, None, False, user_obj)
   163             return user
   191             return user
   164     
   192     
   165         user_created_at = user_dict.get("created_at", None)
   193         user_created_at = user_dict.get("created_at", None)
   166         
   194         
   167         if user_created_at is None:
   195         if user_created_at is None:
   178     
   206     
   179         user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
   207         user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
   180         if "id" not in user_dict:
   208         if "id" not in user_dict:
   181             return None
   209             return None
   182         
   210         
   183         user = User(**user_dict)
   211         user = self.obj_buffer.add_object(User, None, user_dict, True)
   184         
   212         
   185         self.session.add(user)
   213         return user
   186         self.session.flush()
   214 
   187         
       
   188         return user 
       
   189 
   215 
   190     def __process_entity(self, ind, ind_type):
   216     def __process_entity(self, ind, ind_type):
   191         logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
   217         logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
   192         
   218         
   193         ind = clean_keys(ind)
   219         ind = clean_keys(ind)
   194         
   220         
   195         entity_dict = {
   221         entity_dict = {
   196            "indice_start": ind["indices"][0],
   222            "indice_start": ind["indices"][0],
   197            "indice_end"  : ind["indices"][1],
   223            "indice_end"  : ind["indices"][1],
   198            "tweet_id"    : self.tweet.id,
   224            "tweet_id"    : self.tweet.id,
   199            "tweet"       : self.tweet
       
   200         }
   225         }
   201     
   226     
   202         def process_hashtags():
   227         def process_hashtags():
   203             text = ind.get("text", ind.get("hashtag", None))
   228             text = ind.get("text", ind.get("hashtag", None))
   204             if text is None:
   229             if text is None:
   205                 return None 
   230                 return None
   206             hashtag = self.session.query(Hashtag).filter(Hashtag.text == text).first()
   231             hashtag = self.obj_buffer.get(Hashtag, text=text)
       
   232             if hashtag is None: 
       
   233                 hashtag_obj = self.session.query(Hashtag).filter(Hashtag.text == text).first()
       
   234                 if hashtag_obj is not None:
       
   235                     hashtag = ObjectBufferProxy(Hashtag, None, None, False, hashtag_obj)
       
   236                     
   207             if not hashtag:
   237             if not hashtag:
   208                 ind["text"] = text
   238                 ind["text"] = text
   209                 hashtag = Hashtag(**ind)
   239                 hashtag = self.obj_buffer.add_object(Hashtag, None, ind, True)
   210                 self.session.add(hashtag)
       
   211                 self.session.flush()
       
   212             entity_dict['hashtag'] = hashtag
       
   213             entity_dict['hashtag_id'] = hashtag.id
   240             entity_dict['hashtag_id'] = hashtag.id
   214             entity = EntityHashtag(**entity_dict)
   241             return EntityHashtag, entity_dict             
   215             return entity
       
   216         
   242         
   217         def process_user_mentions():
   243         def process_user_mentions():
   218             user_mention = self.__get_user(ind)
   244             user_mention = self.__get_user(ind)
   219             if user_mention is None:
   245             if user_mention is None:
   220                 entity_dict['user'] = None
       
   221                 entity_dict['user_id'] = None
   246                 entity_dict['user_id'] = None
   222             else:
   247             else:
   223                 entity_dict['user'] = user_mention
       
   224                 entity_dict['user_id'] = user_mention.id
   248                 entity_dict['user_id'] = user_mention.id
   225             entity = EntityUser(**entity_dict)
   249             return EntityUser, entity_dict
   226             return entity
       
   227         
   250         
   228         def process_urls():
   251         def process_urls():
   229             url = self.session.query(Url).filter(Url.url == ind["url"]).first()
   252             url = self.obj_buffer.get(Url, url=ind["url"])
   230             if url is None:
   253             if url is None:
   231                 url = Url(**ind)
   254                 url_obj = self.session.query(Url).filter(Url.url == ind["url"]).first()
   232                 self.session.add(url)
   255                 if url_obj is not None:
   233                 self.session.flush()
   256                     url = ObjectBufferProxy(Url, None, None, False, url_obj)
   234             entity_dict['url'] = url
   257             if url is None:
       
   258                 url = self.obj_buffer.add_object(Url, None, ind, True)
   235             entity_dict['url_id'] = url.id
   259             entity_dict['url_id'] = url.id
   236             entity = EntityUrl(**entity_dict)
   260             return EntityUrl, entity_dict
   237             return entity
       
   238         
   261         
   239         #{'': lambda }
   262         #{'': lambda }
   240         entity =  { 
   263         entity_klass, entity_dict =  { 
   241             'hashtags': process_hashtags,
   264             'hashtags': process_hashtags,
   242             'user_mentions' : process_user_mentions,
   265             'user_mentions' : process_user_mentions,
   243             'urls' : process_urls
   266             'urls' : process_urls
   244             }[ind_type]()
   267             }[ind_type]()
   245             
   268             
   246         logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
   269         logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
   247         if entity:
   270         if entity_klass:
   248             self.session.add(entity)
   271             self.obj_buffer.add_object(entity_klass, None, entity_dict, False)
   249             self.session.flush()
       
   250 
   272 
   251 
   273 
   252     def __process_twitter_stream(self):
   274     def __process_twitter_stream(self):
   253         
   275         
   254         tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
   276         tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
   259         
   281         
   260         # get or create user
   282         # get or create user
   261         user = self.__get_user(self.json_dict["user"])
   283         user = self.__get_user(self.json_dict["user"])
   262         if user is None:
   284         if user is None:
   263             logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
   285             logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
   264             ts_copy["user"] = None
       
   265             ts_copy["user_id"] = None
   286             ts_copy["user_id"] = None
   266         else:
   287         else:
   267             ts_copy["user"] = user
   288             ts_copy["user_id"] = user.id
   268             ts_copy["user_id"] = ts_copy["user"].id
   289             
   269             
   290         del(ts_copy['user'])
   270         ts_copy["tweet_source_id"] = self.source_id
   291         ts_copy["tweet_source_id"] = self.source_id
   271         
   292         
   272         self.tweet = Tweet(**ts_copy)
   293         self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True)
   273         self.session.add(self.tweet)
       
   274         self.session.flush()
       
   275             
   294             
   276         # get entities
   295         # get entities
   277         for ind_type, entity_list in self.json_dict["entities"].items():
   296         for ind_type, entity_list in self.json_dict["entities"].items():
   278             for ind in entity_list:
   297             for ind in entity_list:
   279                 self.__process_entity(ind, ind_type)
   298                 self.__process_entity(ind, ind_type)
   312         if user is None:
   331         if user is None:
   313             logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
   332             logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
   314             tweet_fields["user"] = None
   333             tweet_fields["user"] = None
   315             tweet_fields["user_id"] = None
   334             tweet_fields["user_id"] = None
   316         else:
   335         else:
   317             tweet_fields["user"] = user
       
   318             tweet_fields["user_id"] = user.id
   336             tweet_fields["user_id"] = user.id
   319         
   337         
   320         tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
   338         tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
   321         self.tweet = Tweet(**tweet_fields)
   339         self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True)
   322         self.session.add(self.tweet)
       
   323         
   340         
   324         text = self.tweet.text
   341         text = self.tweet.text
   325         
   342         
   326         extractor = twitter_text.Extractor(text)
   343         extractor = twitter_text.Extractor(text)
   327         
   344         
   337 
   354 
   338 
   355 
   339     def process(self):
   356     def process(self):
   340         
   357         
   341         if self.source_id is None:
   358         if self.source_id is None:
   342             tweet_source = TweetSource(original_json=self.json_txt);
   359             tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True)
   343             self.session.add(tweet_source)
       
   344             self.session.flush()
       
   345             self.source_id = tweet_source.id
   360             self.source_id = tweet_source.id
   346         
   361         
   347         if "metadata" in self.json_dict:
   362         if "metadata" in self.json_dict:
   348             self.__process_twitter_rest()
   363             self.__process_twitter_rest()
   349         else:
   364         else:
   350             self.__process_twitter_stream()
   365             self.__process_twitter_stream()
   351 
   366 
   352         tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK'])            
   367         self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, False)
   353         self.session.add(tweet_log)
   368         
   354         
   369         self.obj_buffer.persists(self.session)
       
   370 
   355 
   371 
   356 def set_logging(options, plogger=None):
   372 def set_logging(options, plogger=None):
   357     
   373     
   358     logging_config = {
   374     logging_config = {
   359         "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s',
   375         "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s',