script/lib/iri_tweet/iri_tweet/utils.py
changeset 888 6fc6637d8403
parent 886 1e110b03ae96
child 889 c774bdf7d3dd
equal deleted inserted replaced
887:503f9a7b7d6c 888:6fc6637d8403
     1 from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, #@UnresolvedImport
     1 from models import (Tweet, User, Hashtag, EntityHashtag, APPLICATION_NAME, ACCESS_TOKEN_SECRET, adapt_date, adapt_json, 
     2     EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY,#@UnresolvedImport
     2     ACCESS_TOKEN_KEY)
     3     ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType, #@UnresolvedImport 
     3 from sqlalchemy.sql import select, or_
     4     Media, EntityMedia, Entity, EntityType) #@UnresolvedImport
     4 import Queue
     5 from sqlalchemy.sql import select, or_ #@UnresolvedImport
       
     6 import Queue #@UnresolvedImport
       
     7 import anyjson #@UnresolvedImport
       
     8 import codecs
     5 import codecs
     9 import datetime
     6 import datetime
    10 import email.utils
     7 import email.utils
    11 import logging
     8 import logging
    12 import math
     9 import math
    13 import os.path
    10 import os.path
       
    11 import socket
    14 import sys
    12 import sys
    15 import twitter.oauth #@UnresolvedImport
    13 import twitter.oauth
    16 import twitter.oauth_dance #@UnresolvedImport
    14 import twitter.oauth_dance
    17 import twitter_text #@UnresolvedImport
       
    18 
    15 
    19 
    16 
    20 CACHE_ACCESS_TOKEN = {}
    17 CACHE_ACCESS_TOKEN = {}
    21 
    18 
    22 def get_oauth_token(token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET):
    19 def get_oauth_token(consumer_key, consumer_secret, token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME):
    23     
    20     
    24     global CACHE_ACCESS_TOKEN
    21     global CACHE_ACCESS_TOKEN
    25 
    22 
    26     if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
    23     if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
    27         return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
    24         return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
    32         get_logger().debug("get_oauth_token : reading token from file %s" % token_file_path) #@UndefinedVariable
    29         get_logger().debug("get_oauth_token : reading token from file %s" % token_file_path) #@UndefinedVariable
    33         res = twitter.oauth.read_token_file(token_file_path)
    30         res = twitter.oauth.read_token_file(token_file_path)
    34     
    31     
    35     if res is not None and check_access_token:
    32     if res is not None and check_access_token:
    36         get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable
    33         get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable
    37         t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], CONSUMER_KEY, CONSUMER_SECRET))
    34         t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], consumer_key, consumer_secret))
    38         status = None
    35         status = None
    39         try:
    36         try:
    40             status = t.account.rate_limit_status()
    37             status = t.application.rate_limit_status(resources="account")
    41         except Exception as e:
    38         except Exception as e:
    42             get_logger().debug("get_oauth_token : error getting rate limit status %s " % repr(e))            
    39             get_logger().debug("get_oauth_token : error getting rate limit status %s " % repr(e))            
    43             get_logger().debug("get_oauth_token : error getting rate limit status %s " % str(e))
    40             get_logger().debug("get_oauth_token : error getting rate limit status %s " % str(e))
    44             status = None
    41             status = None
    45         get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable
    42         get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable
    46         if status is None or status['remaining_hits'] == 0:
    43         if status is None or status.get("resources",{}).get("account",{}).get('/account/verify_credentials',{}).get('remaining',0) == 0:
    47             get_logger().debug("get_oauth_token : Problem with status %s" % repr(status))
    44             get_logger().debug("get_oauth_token : Problem with status %s" % repr(status))
    48             res = None
    45             res = None
    49 
    46 
    50     if res is None:
    47     if res is None:
    51         get_logger().debug("get_oauth_token : doing the oauth dance")
    48         get_logger().debug("get_oauth_token : doing the oauth dance")
    52         res = twitter.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
    49         res = twitter.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
       
    50         
    53     
    51     
    54     CACHE_ACCESS_TOKEN[application_name] = res
    52     CACHE_ACCESS_TOKEN[application_name] = res
    55     
    53     
       
    54     get_logger().debug("get_oauth_token : done got %s" % repr(res))
    56     return res
    55     return res
    57 
    56 
    58 def parse_date(date_str):
    57 def parse_date(date_str):
    59     ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable
    58     ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable
    60     return datetime.datetime(*ts[0:7])
    59     return datetime.datetime(*ts[0:7])
   167                         break
   166                         break
   168                 if found:
   167                 if found:
   169                     return proxy        
   168                     return proxy        
   170         return None
   169         return None
   171                 
   170                 
   172 class TwitterProcessorException(Exception):
       
   173     pass
       
   174 
       
   175 class TwitterProcessor(object):
       
   176     
       
   177     def __init__(self, json_dict, json_txt, source_id, session, access_token=None, token_filename=None, user_query_twitter=False):
       
   178 
       
   179         if json_dict is None and json_txt is None:
       
   180             raise TwitterProcessorException("No json")
       
   181         
       
   182         if json_dict is None:
       
   183             self.json_dict = anyjson.deserialize(json_txt)
       
   184         else:
       
   185             self.json_dict = json_dict
       
   186         
       
   187         if not json_txt:
       
   188             self.json_txt = anyjson.serialize(json_dict)
       
   189         else:
       
   190             self.json_txt = json_txt
       
   191         
       
   192         if "id" not in self.json_dict:
       
   193             raise TwitterProcessorException("No id in json")
       
   194         
       
   195         self.source_id = source_id
       
   196         self.session = session
       
   197         self.token_filename = token_filename
       
   198         self.access_token = access_token
       
   199         self.obj_buffer = ObjectsBuffer()
       
   200         self.user_query_twitter = user_query_twitter  
       
   201         
       
   202 
       
   203 
       
   204     def __get_user(self, user_dict, do_merge):
       
   205         get_logger().debug("Get user : " + repr(user_dict)) #@UndefinedVariable
       
   206         
       
   207         user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
       
   208     
       
   209         user_id = user_dict.get("id",None)    
       
   210         user_name = user_dict.get("screen_name", user_dict.get("name", None))
       
   211         
       
   212         if user_id is None and user_name is None:
       
   213             return None
       
   214 
       
   215         user = None
       
   216         if user_id:
       
   217             user = self.obj_buffer.get(User, id=user_id)
       
   218         else:
       
   219             user = self.obj_buffer.get(User, screen_name=user_name)
       
   220 
       
   221         #to do update user id needed            
       
   222         if user is not None:
       
   223             user_created_at = None
       
   224             if user.args is not None:
       
   225                 user_created_at = user.args.get('created_at', None)
       
   226             if user_created_at is None and user_dict.get('created_at', None) is not None and do_merge:
       
   227                 if user.args is None:
       
   228                     user.args = user_dict
       
   229                 else:
       
   230                     user.args.update(user_dict)
       
   231             return user
       
   232 
       
   233         #todo : add methpds to objectbuffer to get buffer user
       
   234         user_obj = None
       
   235         if user_id:
       
   236             user_obj = self.session.query(User).filter(User.id == user_id).first()
       
   237         else:
       
   238             user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first()
       
   239     
       
   240         #todo update user if needed
       
   241         if user_obj is not None:            
       
   242             if user_obj.created_at is not None or user_dict.get('created_at', None) is None or not do_merge :
       
   243                 user = ObjectBufferProxy(User, None, None, False, user_obj)
       
   244             else:
       
   245                 user = self.obj_buffer.add_object(User, None, user_dict, True, user_obj)
       
   246             return user
       
   247     
       
   248         user_created_at = user_dict.get("created_at", None)
       
   249         
       
   250         if user_created_at is None and self.user_query_twitter:
       
   251             
       
   252             if self.access_token is not None:
       
   253                 acess_token_key, access_token_secret = self.access_token
       
   254             else:
       
   255                 acess_token_key, access_token_secret = get_oauth_token(self.token_filename)
       
   256             t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET))
       
   257             try:
       
   258                 if user_id:
       
   259                     user_dict = t.users.show(user_id=user_id)
       
   260                 else:
       
   261                     user_dict = t.users.show(screen_name=user_name)            
       
   262             except Exception as e:
       
   263                 get_logger().info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
       
   264                 get_logger().info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
       
   265                 return None
       
   266             
       
   267         if "id" not in user_dict:
       
   268             return None
       
   269         
       
   270         #TODO filter get, wrap in proxy
       
   271         user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first()
       
   272         
       
   273         if user_obj is not None and not do_merge:
       
   274             return ObjectBufferProxy(User, None, None, False, user_obj)
       
   275         else:        
       
   276             return self.obj_buffer.add_object(User, None, user_dict, True)        
       
   277 
       
   278     def __get_or_create_object(self, klass, filter_by_kwargs, filter_arg, creation_kwargs, must_flush, do_merge):
       
   279         
       
   280         obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs)
       
   281         if obj_proxy is None:
       
   282             query = self.session.query(klass)
       
   283             if filter_arg is not None:
       
   284                 query = query.filter(filter_arg)
       
   285             else:
       
   286                 query = query.filter_by(**filter_by_kwargs)
       
   287             obj_instance = query.first()
       
   288             if obj_instance is not None:
       
   289                 if not do_merge:
       
   290                     obj_proxy = ObjectBufferProxy(klass, None, None, False, obj_instance)
       
   291                 else:
       
   292                     obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush, obj_instance)
       
   293         if obj_proxy is None:
       
   294             obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush)
       
   295         return obj_proxy
       
   296 
       
   297 
       
   298     def __process_entity(self, ind, ind_type):
       
   299         get_logger().debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
       
   300         
       
   301         ind = clean_keys(ind)
       
   302         
       
   303         entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False)
       
   304         
       
   305         entity_dict = {
       
   306            "indice_start"   : ind["indices"][0],
       
   307            "indice_end"     : ind["indices"][1],
       
   308            "tweet_id"       : self.tweet.id,
       
   309            "entity_type_id" : entity_type.id,
       
   310            "source"         : adapt_json(ind)
       
   311         }
       
   312 
       
   313         def process_medias():
       
   314             
       
   315             media_id = ind.get('id', None)
       
   316             if media_id is None:
       
   317                 return None, None
       
   318             
       
   319             type_str = ind.get("type", "photo")
       
   320             media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False)
       
   321             media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"])
       
   322             if "type" in media_ind:
       
   323                 del(media_ind["type"])
       
   324             media_ind['type_id'] = media_type.id            
       
   325             media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False)
       
   326             
       
   327             entity_dict['media_id'] = media.id
       
   328             return EntityMedia, entity_dict
       
   329 
       
   330         def process_hashtags():
       
   331             text = ind.get("text", ind.get("hashtag", None))
       
   332             if text is None:
       
   333                 return None, None
       
   334             ind['text'] = text
       
   335             hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False)
       
   336             entity_dict['hashtag_id'] = hashtag.id
       
   337             return EntityHashtag, entity_dict             
       
   338         
       
   339         def process_user_mentions():
       
   340             user_mention = self.__get_user(ind, False)
       
   341             if user_mention is None:
       
   342                 entity_dict['user_id'] = None
       
   343             else:
       
   344                 entity_dict['user_id'] = user_mention.id
       
   345             return EntityUser, entity_dict
       
   346         
       
   347         def process_urls():
       
   348             url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False)
       
   349             entity_dict['url_id'] = url.id
       
   350             return EntityUrl, entity_dict
       
   351                 
       
   352         #{'': lambda }
       
   353         entity_klass, entity_dict =  { 
       
   354             'hashtags': process_hashtags,
       
   355             'user_mentions' : process_user_mentions,
       
   356             'urls' : process_urls,
       
   357             'media': process_medias,
       
   358             }.get(ind_type, lambda: (Entity, entity_dict))()
       
   359             
       
   360         get_logger().debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
       
   361         if entity_klass:
       
   362             self.obj_buffer.add_object(entity_klass, None, entity_dict, False)
       
   363 
       
   364 
       
   365     def __process_twitter_stream(self):
       
   366         
       
   367         tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
       
   368         if tweet_nb > 0:
       
   369             return
       
   370         
       
   371         ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
       
   372         
       
   373         # get or create user
       
   374         user = self.__get_user(self.json_dict["user"], True)
       
   375         if user is None:
       
   376             get_logger().warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
       
   377             ts_copy["user_id"] = None
       
   378         else:
       
   379             ts_copy["user_id"] = user.id
       
   380             
       
   381         del(ts_copy['user'])
       
   382         ts_copy["tweet_source_id"] = self.source_id
       
   383         
       
   384         self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True)
       
   385             
       
   386         self.__process_entities()
       
   387 
       
   388 
       
   389     def __process_entities(self):
       
   390         if "entities" in self.json_dict:
       
   391             for ind_type, entity_list in self.json_dict["entities"].items():
       
   392                 for ind in entity_list:
       
   393                     self.__process_entity(ind, ind_type)
       
   394         else:
       
   395             
       
   396             text = self.tweet.text
       
   397             extractor = twitter_text.Extractor(text)
       
   398             for ind in extractor.extract_hashtags_with_indices():
       
   399                 self.__process_entity(ind, "hashtags")
       
   400             
       
   401             for ind in extractor.extract_urls_with_indices():
       
   402                 self.__process_entity(ind, "urls")
       
   403             
       
   404             for ind in extractor.extract_mentioned_screen_names_with_indices():
       
   405                 self.__process_entity(ind, "user_mentions")
       
   406 
       
   407     def __process_twitter_rest(self):
       
   408         tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
       
   409         if tweet_nb > 0:
       
   410             return
       
   411         
       
   412         
       
   413         tweet_fields = {
       
   414             'created_at': self.json_dict["created_at"], 
       
   415             'favorited': False,
       
   416             'id': self.json_dict["id"],
       
   417             'id_str': self.json_dict["id_str"],
       
   418             #'in_reply_to_screen_name': ts["to_user"], 
       
   419             'in_reply_to_user_id': self.json_dict.get("in_reply_to_user_id",None),
       
   420             'in_reply_to_user_id_str': self.json_dict.get("in_reply_to_user_id_str", None),
       
   421             #'place': ts["place"],
       
   422             'source': self.json_dict["source"],
       
   423             'text': self.json_dict["text"],
       
   424             'truncated': False,            
       
   425             'tweet_source_id' : self.source_id,
       
   426         }
       
   427         
       
   428         #user
       
   429     
       
   430         user_fields = {
       
   431             'lang' : self.json_dict.get('iso_language_code',None),
       
   432             'profile_image_url' : self.json_dict["profile_image_url"],
       
   433             'screen_name' : self.json_dict["from_user"],
       
   434             'id' : self.json_dict["from_user_id"],
       
   435             'id_str' : self.json_dict["from_user_id_str"],
       
   436             'name' : self.json_dict['from_user_name'],
       
   437         }
       
   438         
       
   439         user = self.__get_user(user_fields, do_merge=False)
       
   440         if user is None:
       
   441             get_logger().warning("USER not found " + repr(user_fields)) #@UndefinedVariable
       
   442             tweet_fields["user_id"] = None
       
   443         else:
       
   444             tweet_fields["user_id"] = user.id
       
   445         
       
   446         tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
       
   447         self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True)
       
   448                 
       
   449         self.__process_entities()
       
   450 
       
   451 
       
   452 
       
   453     def process(self):
       
   454         
       
   455         if self.source_id is None:
       
   456             tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True)
       
   457             self.source_id = tweet_source.id
       
   458         
       
   459         if "metadata" in self.json_dict:
       
   460             self.__process_twitter_rest()
       
   461         else:
       
   462             self.__process_twitter_stream()
       
   463 
       
   464         self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, True)
       
   465         
       
   466         self.obj_buffer.persists(self.session)
       
   467 
   171 
   468 
   172 
   469 def set_logging(options, plogger=None, queue=None):
   173 def set_logging(options, plogger=None, queue=None):
   470     
   174     
   471     logging_config = {
   175     logging_config = {
   506     
   210     
   507     options.debug = (options.verbose-options.quiet > 0)
   211     options.debug = (options.verbose-options.quiet > 0)
   508     return logger
   212     return logger
   509 
   213 
   510 def set_logging_options(parser):
   214 def set_logging_options(parser):
   511     parser.add_option("-l", "--log", dest="logfile",
   215     parser.add_argument("-l", "--log", dest="logfile",
   512                       help="log to file", metavar="LOG", default="stderr")
   216                       help="log to file", metavar="LOG", default="stderr")
   513     parser.add_option("-v", dest="verbose", action="count",
   217     parser.add_argument("-v", dest="verbose", action="count",
   514                       help="verbose", metavar="VERBOSE", default=0)
   218                       help="verbose", default=0)
   515     parser.add_option("-q", dest="quiet", action="count",
   219     parser.add_argument("-q", dest="quiet", action="count",
   516                       help="quiet", metavar="QUIET", default=0)
   220                       help="quiet", default=0)
   517 
   221 
   518 def get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
   222 def get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
   519     
   223     
   520     query = query.join(EntityHashtag).join(Hashtag)
   224     query = query.join(EntityHashtag).join(Hashtag)
   521     
   225     
   589         try:
   293         try:
   590             ei = record.exc_info
   294             ei = record.exc_info
   591             if ei:
   295             if ei:
   592                 dummy = self.format(record) # just to get traceback text into record.exc_text
   296                 dummy = self.format(record) # just to get traceback text into record.exc_text
   593                 record.exc_info = None  # not needed any more
   297                 record.exc_info = None  # not needed any more
   594             if not self.ignore_full or not self.queue.full():
   298             if not self.ignore_full or (not self.queue.full()):
   595                 self.queue.put_nowait(record)
   299                 self.queue.put_nowait(record)
       
   300         except AssertionError:
       
   301             pass
   596         except Queue.Full:
   302         except Queue.Full:
   597             if self.ignore_full:
   303             if self.ignore_full:
   598                 pass
   304                 pass
   599             else:
   305             else:
   600                 raise
   306                 raise
   624         writer.write("\n")
   330         writer.write("\n")
   625     writer.flush()
   331     writer.flush()
   626     
   332     
   627     return writer
   333     return writer
   628 
   334 
       
   335 def get_unused_port():
       
   336     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       
   337     s.bind(('localhost', 0))
       
   338     _, port = s.getsockname()
       
   339     s.close()
       
   340     return port
       
   341