script/lib/iri_tweet/utils.py
changeset 253 e9335ee3cf71
parent 250 6334869ab06d
parent 244 d4b7d6e2633f
child 254 2209e66bb50b
equal deleted inserted replaced
252:2ebf22c65168 253:e9335ee3cf71
     1 from models import Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, \
     1 from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, 
     2     EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, \
     2     EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, 
     3     ACCESS_TOKEN_SECRET, adapt_date, adapt_json
     3     ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog)
     4 from sqlalchemy.sql import select, or_ #@UnresolvedImport
     4 from sqlalchemy.sql import select, or_ #@UnresolvedImport
     5 import anyjson #@UnresolvedImport
     5 import anyjson #@UnresolvedImport
     6 import datetime
     6 import datetime
     7 import email.utils
     7 import email.utils
     8 import logging #@UnresolvedImport
     8 import logging #@UnresolvedImport
    75         else:
    75         else:
    76             return value
    76             return value
    77     return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])    
    77     return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])    
    78 
    78 
    79 
    79 
       
    80 class ObjectBufferProxy(object):
       
    81     def __init__(self, klass, args, kwargs, must_flush, instance=None):
       
    82         self.klass= klass
       
    83         self.args = args
       
    84         self.kwargs = kwargs
       
    85         self.must_flush = must_flush
       
    86         self.instance = instance
       
    87         
       
    88     def persists(self, session):
       
    89         new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
       
    90         new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {}
       
    91         
       
    92         self.instance = self.klass(*new_args, **new_kwargs)
       
    93         session.add(self.instance)
       
    94         if self.must_flush:
       
    95             session.flush()
       
    96             
       
    97     def __getattr__(self, name):
       
    98         return lambda : getattr(self.instance, name) if self.instance else None
       
    99         
       
   100         
       
   101     
       
   102 
       
   103 class ObjectsBuffer(object):
       
   104 
       
   105     def __init__(self):
       
   106         self.__bufferlist = []
       
   107         
       
   108     def persists(self, session):
       
   109         for object_proxy in self.__bufferlist:
       
   110             object_proxy.persists(session)
       
   111             
       
   112     def add_object(self, klass, args, kwargs, must_flush):
       
   113         new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush)
       
   114         self.__bufferlist.append(new_proxy)
       
   115         return new_proxy 
       
   116     
       
   117     def get(self, klass, **kwargs):
       
   118         for proxy in self.__bufferlist:
       
   119             if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass:
       
   120                 continue
       
   121             found = True
       
   122             for k,v in kwargs.items():
       
   123                 if (k not in proxy.kwargs) or v != proxy.kwargs[k]:
       
   124                     found = False
       
   125                     break
       
   126             if found:
       
   127                 return proxy
       
   128         
       
   129         return None
       
   130                 
       
   131                     
       
   132         
       
   133 
    80 
   134 
    81 class TwitterProcessorException(Exception):
   135 class TwitterProcessorException(Exception):
    82     pass
   136     pass
    83 
   137 
    84 class TwitterProcessor(object):
   138 class TwitterProcessor(object):
    85     
   139     
    86     def __init__(self, json_dict, json_txt, session, token_filename=None):
   140     def __init__(self, json_dict, json_txt, source_id, session, token_filename=None):
    87 
   141 
    88         if json_dict is None and json_txt is None:
   142         if json_dict is None and json_txt is None:
    89             raise TwitterProcessorException("No json")
   143             raise TwitterProcessorException("No json")
    90         
   144         
    91         if json_dict is None:
   145         if json_dict is None:
    99             self.json_txt = json_txt
   153             self.json_txt = json_txt
   100         
   154         
   101         if "id" not in self.json_dict:
   155         if "id" not in self.json_dict:
   102             raise TwitterProcessorException("No id in json")
   156             raise TwitterProcessorException("No id in json")
   103         
   157         
       
   158         self.source_id = source_id
   104         self.session = session
   159         self.session = session
   105         self.token_filename = token_filename
   160         self.token_filename = token_filename
       
   161         self.obj_buffer = ObjectsBuffer()
       
   162 
   106 
   163 
   107     def __get_user(self, user_dict):
   164     def __get_user(self, user_dict):
   108         logging.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
   165         logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
   109     
   166     
   110         user_id = user_dict.get("id",None)    
   167         user_id = user_dict.get("id",None)    
   111         user_name = user_dict.get("screen_name", user_dict.get("name", None))
   168         user_name = user_dict.get("screen_name", user_dict.get("name", None))
   112         
   169         
   113         if user_id is None and user_name is None:
   170         if user_id is None and user_name is None:
   114             return None
   171             return None
   115     
   172 
       
   173         user = None
   116         if user_id:
   174         if user_id:
   117             user = self.session.query(User).filter(User.id == user_id).first()
   175             user = self.obj_buffer.get(User, id=user_id)
   118         else:
   176         else:
   119             user = self.session.query(User).filter(User.screen_name.ilike(user_name)).first()
   177             user = self.obj_buffer.get(User, screen_name=user_name)
   120     
   178             
   121         if user is not None:
   179         if user is not None:
       
   180             return user
       
   181 
       
   182         #todo : add methpds to objectbuffer to get buffer user
       
   183         user_obj = None
       
   184         if user_id:
       
   185             user_obj = self.session.query(User).filter(User.id == user_id).first()
       
   186         else:
       
   187             user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first()
       
   188     
       
   189         if user_obj is not None:
       
   190             user = ObjectBufferProxy(User, None, None, False, user_obj)
   122             return user
   191             return user
   123     
   192     
   124         user_created_at = user_dict.get("created_at", None)
   193         user_created_at = user_dict.get("created_at", None)
   125         
   194         
   126         if user_created_at is None:
   195         if user_created_at is None:
   130                 if user_id:
   199                 if user_id:
   131                     user_dict = t.users.show(user_id=user_id)
   200                     user_dict = t.users.show(user_id=user_id)
   132                 else:
   201                 else:
   133                     user_dict = t.users.show(screen_name=user_name)            
   202                     user_dict = t.users.show(screen_name=user_name)            
   134             except Exception as e:
   203             except Exception as e:
   135                 logging.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
   204                 logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
   136                 logging.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
   205                 logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
   137                 return None
   206                 return None
   138     
   207     
   139         user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
   208         user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
   140         if "id" not in user_dict:
   209         if "id" not in user_dict:
   141             return None
   210             return None
   142         
   211         
       
   212         #TODO filter get, wrap in proxy
   143         user = self.session.query(User).filter(User.id == user_dict["id"]).first()
   213         user = self.session.query(User).filter(User.id == user_dict["id"]).first()
   144         
   214         
   145         if user is not None:
   215         if user is not None:
   146             return user
   216             return user
   147         
   217         
   148         user = User(**user_dict)
   218         user = self.obj_buffer.add_object(User, None, user_dict, True)
   149         
   219         
   150         self.session.add(user)
   220         return user
   151         self.session.flush()
   221 
   152         
       
   153         return user 
       
   154 
   222 
   155     def __process_entity(self, ind, ind_type):
   223     def __process_entity(self, ind, ind_type):
   156         logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
   224         logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
   157         
   225         
   158         ind = clean_keys(ind)
   226         ind = clean_keys(ind)
   159         
   227         
   160         entity_dict = {
   228         entity_dict = {
   161            "indice_start": ind["indices"][0],
   229            "indice_start": ind["indices"][0],
   162            "indice_end"  : ind["indices"][1],
   230            "indice_end"  : ind["indices"][1],
   163            "tweet_id"    : self.tweet.id,
   231            "tweet_id"    : self.tweet.id,
   164            "tweet"       : self.tweet
       
   165         }
   232         }
   166     
   233     
   167         def process_hashtags():
   234         def process_hashtags():
   168             text = ind.get("text", ind.get("hashtag", None))
   235             text = ind.get("text", ind.get("hashtag", None))
   169             if text is None:
   236             if text is None:
   170                 return None 
   237                 return None
   171             hashtag = self.session.query(Hashtag).filter(Hashtag.text.ilike(text)).first()
   238             hashtag = self.obj_buffer.get(Hashtag, text=text)
       
   239             if hashtag is None: 
       
   240                 hashtag_obj = self.session.query(Hashtag).filter(Hashtag.text.ilike(text)).first()
       
   241                 if hashtag_obj is not None:
       
   242                     hashtag = ObjectBufferProxy(Hashtag, None, None, False, hashtag_obj)
       
   243                     
   172             if hashtag is None:
   244             if hashtag is None:
   173                 ind["text"] = text
   245                 ind["text"] = text
   174                 hashtag = Hashtag(**ind)
   246                 hashtag = self.obj_buffer.add_object(Hashtag, None, ind, True)
   175                 self.session.add(hashtag)
       
   176                 self.session.flush()
       
   177             entity_dict['hashtag'] = hashtag
       
   178             entity_dict['hashtag_id'] = hashtag.id
   247             entity_dict['hashtag_id'] = hashtag.id
   179             entity = EntityHashtag(**entity_dict)
   248             return EntityHashtag, entity_dict             
   180             return entity
       
   181         
   249         
   182         def process_user_mentions():
   250         def process_user_mentions():
   183             user_mention = self.__get_user(ind)
   251             user_mention = self.__get_user(ind)
   184             if user_mention is None:
   252             if user_mention is None:
   185                 entity_dict['user'] = None
       
   186                 entity_dict['user_id'] = None
   253                 entity_dict['user_id'] = None
   187             else:
   254             else:
   188                 entity_dict['user'] = user_mention
       
   189                 entity_dict['user_id'] = user_mention.id
   255                 entity_dict['user_id'] = user_mention.id
   190             entity = EntityUser(**entity_dict)
   256             return EntityUser, entity_dict
   191             return entity
       
   192         
   257         
   193         def process_urls():
   258         def process_urls():
   194             url = self.session.query(Url).filter(Url.url == ind["url"]).first()
   259             url = self.obj_buffer.get(Url, url=ind["url"])
   195             if url is None:
   260             if url is None:
   196                 url = Url(**ind)
   261                 url_obj = self.session.query(Url).filter(Url.url == ind["url"]).first()
   197                 self.session.add(url)
   262                 if url_obj is not None:
   198                 self.session.flush()
   263                     url = ObjectBufferProxy(Url, None, None, False, url_obj)
   199             entity_dict['url'] = url
   264             if url is None:
       
   265                 url = self.obj_buffer.add_object(Url, None, ind, True)
   200             entity_dict['url_id'] = url.id
   266             entity_dict['url_id'] = url.id
   201             entity = EntityUrl(**entity_dict)
   267             return EntityUrl, entity_dict
   202             return entity
       
   203         
   268         
   204         #{'': lambda }
   269         #{'': lambda }
   205         entity =  { 
   270         entity_klass, entity_dict =  { 
   206             'hashtags': process_hashtags,
   271             'hashtags': process_hashtags,
   207             'user_mentions' : process_user_mentions,
   272             'user_mentions' : process_user_mentions,
   208             'urls' : process_urls
   273             'urls' : process_urls
   209             }[ind_type]()
   274             }[ind_type]()
   210             
   275             
   211         logging.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
   276         logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
   212         if entity:
   277         if entity_klass:
   213             self.session.add(entity)
   278             self.obj_buffer.add_object(entity_klass, None, entity_dict, False)
   214             self.session.flush()
       
   215 
   279 
   216 
   280 
   217     def __process_twitter_stream(self):
   281     def __process_twitter_stream(self):
   218         
   282         
   219         tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
   283         tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
   223         ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
   287         ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
   224         
   288         
   225         # get or create user
   289         # get or create user
   226         user = self.__get_user(self.json_dict["user"])
   290         user = self.__get_user(self.json_dict["user"])
   227         if user is None:
   291         if user is None:
   228             logging.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
   292             logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
   229             ts_copy["user"] = None
       
   230             ts_copy["user_id"] = None
   293             ts_copy["user_id"] = None
   231         else:
   294         else:
   232             ts_copy["user"] = user
   295             ts_copy["user_id"] = user.id
   233             ts_copy["user_id"] = ts_copy["user"].id
   296             
   234         ts_copy["original_json"] = self.json_txt
   297         del(ts_copy['user'])
   235         
   298         ts_copy["tweet_source_id"] = self.source_id
   236         self.tweet = Tweet(**ts_copy)
   299         
   237         self.session.add(self.tweet)
   300         self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True)
   238             
   301             
   239         # get entities
   302         # get entities
   240         if "entities" in self.json_dict:
   303         if "entities" in self.json_dict:
   241             for ind_type, entity_list in self.json_dict["entities"].items():
   304             for ind_type, entity_list in self.json_dict["entities"].items():
   242                 for ind in entity_list:
   305                 for ind in entity_list:
   258 
   321 
   259     def __process_twitter_rest(self):
   322     def __process_twitter_rest(self):
   260         tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
   323         tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
   261         if tweet_nb > 0:
   324         if tweet_nb > 0:
   262             return
   325             return
   263             
   326         
       
   327         
   264         tweet_fields = {
   328         tweet_fields = {
   265             'created_at': self.json_dict["created_at"], 
   329             'created_at': self.json_dict["created_at"], 
   266             'favorited': False,
   330             'favorited': False,
   267             'id': self.json_dict["id"],
   331             'id': self.json_dict["id"],
   268             'id_str': self.json_dict["id_str"],
   332             'id_str': self.json_dict["id_str"],
   270             'in_reply_to_user_id': self.json_dict["to_user_id"],
   334             'in_reply_to_user_id': self.json_dict["to_user_id"],
   271             'in_reply_to_user_id_str': self.json_dict["to_user_id_str"],
   335             'in_reply_to_user_id_str': self.json_dict["to_user_id_str"],
   272             #'place': ts["place"],
   336             #'place': ts["place"],
   273             'source': self.json_dict["source"],
   337             'source': self.json_dict["source"],
   274             'text': self.json_dict["text"],
   338             'text': self.json_dict["text"],
   275             'truncated': False,
   339             'truncated': False,            
   276             'original_json' : self.json_txt,
   340             'tweet_source_id' : self.source_id,
   277         }
   341         }
   278         
   342         
   279         #user
   343         #user
   280     
   344     
   281         user_fields = {
   345         user_fields = {
   284             'screen_name' : self.json_dict["from_user"],                   
   348             'screen_name' : self.json_dict["from_user"],                   
   285         }
   349         }
   286         
   350         
   287         user = self.__get_user(user_fields)
   351         user = self.__get_user(user_fields)
   288         if user is None:
   352         if user is None:
   289             logging.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
   353             logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
   290             tweet_fields["user"] = None
       
   291             tweet_fields["user_id"] = None
   354             tweet_fields["user_id"] = None
   292         else:
   355         else:
   293             tweet_fields["user"] = user
       
   294             tweet_fields["user_id"] = user.id
   356             tweet_fields["user_id"] = user.id
   295         
   357         
   296         tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
   358         tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
   297         self.tweet = Tweet(**tweet_fields)
   359         self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True)
   298         self.session.add(self.tweet)
       
   299         
   360         
   300         text = self.tweet.text
   361         text = self.tweet.text
   301         
   362         
   302         extractor = twitter_text.Extractor(text)
   363         extractor = twitter_text.Extractor(text)
   303         
   364         
   304         for ind in extractor.extract_hashtags_with_indices():
   365         for ind in extractor.extract_hashtags_with_indices():
   305             self.__process_entity(ind, "hashtags")
   366             self.__process_entity(ind, "hashtags")
   306             
   367                     
       
   368         for ind in extractor.extract_urls_with_indices():
       
   369             self.__process_entity(ind, "urls")
       
   370         
   307         for ind in extractor.extract_mentioned_screen_names_with_indices():
   371         for ind in extractor.extract_mentioned_screen_names_with_indices():
   308             self.__process_entity(ind, "user_mentions")
   372             self.__process_entity(ind, "user_mentions")
   309         
   373 
   310         for ind in extractor.extract_urls_with_indices():
       
   311             self.__process_entity(ind, "urls")
       
   312         
       
   313         self.session.flush()
       
   314 
   374 
   315 
   375 
   316     def process(self):
   376     def process(self):
       
   377         
       
   378         if self.source_id is None:
       
   379             tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True)
       
   380             self.source_id = tweet_source.id
       
   381         
   317         if "metadata" in self.json_dict:
   382         if "metadata" in self.json_dict:
   318             self.__process_twitter_rest()
   383             self.__process_twitter_rest()
   319         else:
   384         else:
   320             self.__process_twitter_stream()
   385             self.__process_twitter_stream()
   321         
   386 
   322 
   387         self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, False)
   323 def set_logging(options):
   388         
   324     
   389         self.obj_buffer.persists(self.session)
   325     logging_config = {}
   390 
       
   391 
       
   392 def set_logging(options, plogger=None):
       
   393     
       
   394     logging_config = {
       
   395         "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s',
       
   396         "level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable
       
   397     }
   326     
   398     
   327     if options.logfile == "stdout":
   399     if options.logfile == "stdout":
   328         logging_config["stream"] = sys.stdout
   400         logging_config["stream"] = sys.stdout
   329     elif options.logfile == "stderr":
   401     elif options.logfile == "stderr":
   330         logging_config["stream"] = sys.stderr
   402         logging_config["stream"] = sys.stderr
   331     else:
   403     else:
   332         logging_config["filename"] = options.logfile
   404         logging_config["filename"] = options.logfile
   333         
   405             
   334     logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) #@UndefinedVariable
   406     logger = plogger
   335     logging.basicConfig(**logging_config) #@UndefinedVariable
   407     if logger is None:
       
   408         logger = logging.getLogger() #@UndefinedVariable
       
   409     
       
   410     if len(logger.handlers) == 0:    
       
   411         filename = logging_config.get("filename")
       
   412         if filename:
       
   413             mode = logging_config.get("filemode", 'a')
       
   414             hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable
       
   415         else:
       
   416             stream = logging_config.get("stream")
       
   417             hdlr = logging.StreamHandler(stream) #@UndefinedVariable
       
   418         fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable
       
   419         dfs = logging_config.get("datefmt", None)
       
   420         fmt = logging.Formatter(fs, dfs) #@UndefinedVariable
       
   421         hdlr.setFormatter(fmt)
       
   422         logger.addHandler(hdlr)
       
   423         level = logging_config.get("level")
       
   424         if level is not None:
       
   425             logger.setLevel(level)
   336     
   426     
   337     options.debug = (options.verbose-options.quiet > 0)
   427     options.debug = (options.verbose-options.quiet > 0)
   338 
   428 
   339 def set_logging_options(parser):
   429 def set_logging_options(parser):
   340     parser.add_option("-l", "--log", dest="logfile",
   430     parser.add_option("-l", "--log", dest="logfile",
   385         
   475         
   386         query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable
   476         query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable
   387     
   477     
   388     return query.distinct()
   478     return query.distinct()
   389 
   479 
   390     
   480 logger = logging.getLogger() #@UndefinedVariable