script/lib/iri_tweet/utils.py
changeset 254 2209e66bb50b
parent 253 e9335ee3cf71
child 255 500cd0405c7a
equal deleted inserted replaced
253:e9335ee3cf71 254:2209e66bb50b
     1 from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, 
     1 from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, 
     2     EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, 
     2     EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, 
     3     ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog)
     3     ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType, 
       
     4     Media, EntityMedia, Entity, EntityType)
     4 from sqlalchemy.sql import select, or_ #@UnresolvedImport
     5 from sqlalchemy.sql import select, or_ #@UnresolvedImport
     5 import anyjson #@UnresolvedImport
     6 import anyjson #@UnresolvedImport
     6 import datetime
     7 import datetime
     7 import email.utils
     8 import email.utils
     8 import logging #@UnresolvedImport
     9 import logging #@UnresolvedImport
    14 
    15 
    15 
    16 
    16 
    17 
    17 CACHE_ACCESS_TOKEN = {}
    18 CACHE_ACCESS_TOKEN = {}
    18 
    19 
    19 def get_oauth_token(token_file_path=None, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET):
    20 def get_oauth_token(token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET):
    20     
    21     
    21     global CACHE_ACCESS_TOKEN
    22     global CACHE_ACCESS_TOKEN
    22     
    23 
    23     if CACHE_ACCESS_TOKEN is not None and application_name in CACHE_ACCESS_TOKEN:
    24     if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
    24         return CACHE_ACCESS_TOKEN[application_name]
       
    25     
       
    26     if token_file_path and os.path.exists(token_file_path):
       
    27         logging.debug("reading token from file %s" % token_file_path) #@UndefinedVariable
       
    28         CACHE_ACCESS_TOKEN[application_name] = twitter.oauth.read_token_file(token_file_path)
       
    29         return CACHE_ACCESS_TOKEN[application_name]
       
    30         #read access token info from path
       
    31     
       
    32     if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
       
    33         return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
    25         return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
    34     
    26     
    35     CACHE_ACCESS_TOKEN[application_name] = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
    27     res = CACHE_ACCESS_TOKEN.get(application_name, None)
    36     return CACHE_ACCESS_TOKEN[application_name]
    28     
       
    29     if res is None and token_file_path and os.path.exists(token_file_path):
       
    30         get_logger().debug("get_oauth_token : reading token from file %s" % token_file_path) #@UndefinedVariable
       
    31         res = twitter.oauth.read_token_file(token_file_path)
       
    32     
       
    33     if res is not None and check_access_token:
       
    34         get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable
       
    35         t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], CONSUMER_KEY, CONSUMER_SECRET))
       
    36         status = None
       
    37         try:
       
    38             status = t.account.rate_limit_status()
       
    39         except Exception as e:
       
    40             get_logger().debug("get_oauth_token : error getting rate limit status %s" % repr(e))
       
    41             status = None
       
    42         get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable
       
    43         if status is None or status['remaining_hits'] == 0:
       
    44             get_logger().debug("get_oauth_token : Problem with status %s" % repr(status))
       
    45             res = None
       
    46 
       
    47     if res is None:
       
    48         get_logger().debug("get_oauth_token : doing the oauth dance")
       
    49         res = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
       
    50     
       
    51     CACHE_ACCESS_TOKEN[application_name] = res
       
    52     
       
    53     return res
    37 
    54 
    38 def parse_date(date_str):
    55 def parse_date(date_str):
    39     ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable
    56     ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable
    40     return datetime.datetime(*ts[0:7])
    57     return datetime.datetime(*ts[0:7])
    41 
    58 
    52 #            "original_json" : adapt_json,
    69 #            "original_json" : adapt_json,
    53         },
    70         },
    54         "user": {
    71         "user": {
    55             "created_at"  : adapt_date,
    72             "created_at"  : adapt_date,
    56         },
    73         },
       
    74 
       
    75     },
       
    76                   
       
    77     'entities' : {
       
    78         "medias": {
       
    79             "sizes"  : adapt_json,
       
    80         },                  
    57     },
    81     },
    58     'rest': {
    82     'rest': {
    59         "tweet" : {
    83         "tweet" : {
    60             "place"         : adapt_json,
    84             "place"         : adapt_json,
    61             "geo"           : adapt_json,
    85             "geo"           : adapt_json,
    87         
   111         
    88     def persists(self, session):
   112     def persists(self, session):
    89         new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
   113         new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
    90         new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {}
   114         new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {}
    91         
   115         
    92         self.instance = self.klass(*new_args, **new_kwargs)
   116         if self.instance is None:
       
   117             self.instance = self.klass(*new_args, **new_kwargs)
       
   118         else:
       
   119             self.instance = self.klass(*new_args, **new_kwargs)
       
   120             self.instance = session.merge(self.instance)
       
   121 
    93         session.add(self.instance)
   122         session.add(self.instance)
    94         if self.must_flush:
   123         if self.must_flush:
    95             session.flush()
   124             session.flush()
    96             
   125             
    97     def __getattr__(self, name):
   126     def __getattr__(self, name):
   102 
   131 
   103 class ObjectsBuffer(object):
   132 class ObjectsBuffer(object):
   104 
   133 
   105     def __init__(self):
   134     def __init__(self):
   106         self.__bufferlist = []
   135         self.__bufferlist = []
       
   136         self.__bufferdict = {}
       
   137     
       
   138     def __add_proxy_object(self, proxy):
       
   139         proxy_list =  self.__bufferdict.get(proxy.klass, None)
       
   140         if proxy_list is None:
       
   141             proxy_list = []
       
   142             self.__bufferdict[proxy.klass] = proxy_list
       
   143         proxy_list.append(proxy)
       
   144         self.__bufferlist.append(proxy)
   107         
   145         
   108     def persists(self, session):
   146     def persists(self, session):
   109         for object_proxy in self.__bufferlist:
   147         for object_proxy in self.__bufferlist:
   110             object_proxy.persists(session)
   148             object_proxy.persists(session)
   111             
   149                 
   112     def add_object(self, klass, args, kwargs, must_flush):
   150     def add_object(self, klass, args, kwargs, must_flush, instance=None):
   113         new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush)
   151         new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush, instance)
   114         self.__bufferlist.append(new_proxy)
   152         self.__add_proxy_object(new_proxy)
   115         return new_proxy 
   153         return new_proxy 
   116     
   154     
   117     def get(self, klass, **kwargs):
   155     def get(self, klass, **kwargs):
   118         for proxy in self.__bufferlist:
   156         if klass in self.__bufferdict:
   119             if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass:
   157             for proxy in self.__bufferdict[klass]:
   120                 continue
   158                 if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass:
   121             found = True
   159                     continue
   122             for k,v in kwargs.items():
   160                 found = True
   123                 if (k not in proxy.kwargs) or v != proxy.kwargs[k]:
   161                 for k,v in kwargs.items():
   124                     found = False
   162                     if (k not in proxy.kwargs) or v != proxy.kwargs[k]:
   125                     break
   163                         found = False
   126             if found:
   164                         break
   127                 return proxy
   165                 if found:
   128         
   166                     return proxy        
   129         return None
   167         return None
   130                 
   168                 
   131                     
       
   132         
       
   133 
       
   134 
       
   135 class TwitterProcessorException(Exception):
   169 class TwitterProcessorException(Exception):
   136     pass
   170     pass
   137 
   171 
   138 class TwitterProcessor(object):
   172 class TwitterProcessor(object):
   139     
   173     
   140     def __init__(self, json_dict, json_txt, source_id, session, token_filename=None):
   174     def __init__(self, json_dict, json_txt, source_id, session, access_token=None, token_filename=None):
   141 
   175 
   142         if json_dict is None and json_txt is None:
   176         if json_dict is None and json_txt is None:
   143             raise TwitterProcessorException("No json")
   177             raise TwitterProcessorException("No json")
   144         
   178         
   145         if json_dict is None:
   179         if json_dict is None:
   156             raise TwitterProcessorException("No id in json")
   190             raise TwitterProcessorException("No id in json")
   157         
   191         
   158         self.source_id = source_id
   192         self.source_id = source_id
   159         self.session = session
   193         self.session = session
   160         self.token_filename = token_filename
   194         self.token_filename = token_filename
       
   195         self.access_token = access_token
   161         self.obj_buffer = ObjectsBuffer()
   196         self.obj_buffer = ObjectsBuffer()
   162 
   197         
   163 
   198 
   164     def __get_user(self, user_dict):
   199 
   165         logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
   200     def __get_user(self, user_dict, do_merge, query_twitter = False):
       
   201         get_logger().debug("Get user : " + repr(user_dict)) #@UndefinedVariable
   166     
   202     
   167         user_id = user_dict.get("id",None)    
   203         user_id = user_dict.get("id",None)    
   168         user_name = user_dict.get("screen_name", user_dict.get("name", None))
   204         user_name = user_dict.get("screen_name", user_dict.get("name", None))
   169         
   205         
   170         if user_id is None and user_name is None:
   206         if user_id is None and user_name is None:
   190             user = ObjectBufferProxy(User, None, None, False, user_obj)
   226             user = ObjectBufferProxy(User, None, None, False, user_obj)
   191             return user
   227             return user
   192     
   228     
   193         user_created_at = user_dict.get("created_at", None)
   229         user_created_at = user_dict.get("created_at", None)
   194         
   230         
   195         if user_created_at is None:
   231         if user_created_at is None and query_twitter:
   196             acess_token_key, access_token_secret = get_oauth_token(self.token_filename)
   232             
       
   233             if self.access_token is not None:
       
   234                 acess_token_key, access_token_secret = self.access_token
       
   235             else:
       
   236                 acess_token_key, access_token_secret = get_oauth_token(self.token_filename)
   197             t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET))
   237             t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET))
   198             try:
   238             try:
   199                 if user_id:
   239                 if user_id:
   200                     user_dict = t.users.show(user_id=user_id)
   240                     user_dict = t.users.show(user_id=user_id)
   201                 else:
   241                 else:
   202                     user_dict = t.users.show(screen_name=user_name)            
   242                     user_dict = t.users.show(screen_name=user_name)            
   203             except Exception as e:
   243             except Exception as e:
   204                 logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
   244                 get_logger().info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
   205                 logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
   245                 get_logger().info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
   206                 return None
   246                 return None
   207     
   247     
   208         user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
   248         user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
   209         if "id" not in user_dict:
   249         if "id" not in user_dict:
   210             return None
   250             return None
   211         
   251         
   212         #TODO filter get, wrap in proxy
   252         #TODO filter get, wrap in proxy
   213         user = self.session.query(User).filter(User.id == user_dict["id"]).first()
   253         user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first()
   214         
   254         
   215         if user is not None:
   255         if user_obj is not None:
   216             return user
   256             if not do_merge:
       
   257                 return ObjectBufferProxy(User, None, None, False, user_obj)
   217         
   258         
   218         user = self.obj_buffer.add_object(User, None, user_dict, True)
   259         user = self.obj_buffer.add_object(User, None, user_dict, True)
   219         
   260         
   220         return user
   261         return user
   221 
   262 
       
   263     def __get_or_create_object(self, klass, filter_by_kwargs, filter, creation_kwargs, must_flush, do_merge):
       
   264         
       
   265         obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs)
       
   266         if obj_proxy is None:
       
   267             query = self.session.query(klass)
       
   268             if filter is not None:
       
   269                 query = query.filter(filter)
       
   270             else:
       
   271                 query = query.filter_by(**filter_by_kwargs)
       
   272             obj_instance = query.first()
       
   273             if obj_instance is not None:
       
   274                 if not do_merge:
       
   275                     obj_proxy = ObjectBufferProxy(klass, None, None, False, obj_instance)
       
   276                 else:
       
   277                     obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush, obj_instance)
       
   278         if obj_proxy is None:
       
   279             obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush)
       
   280         return obj_proxy
       
   281 
   222 
   282 
   223     def __process_entity(self, ind, ind_type):
   283     def __process_entity(self, ind, ind_type):
   224         logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
   284         get_logger().debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
   225         
   285         
   226         ind = clean_keys(ind)
   286         ind = clean_keys(ind)
   227         
   287         
       
   288         entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False)
       
   289         
   228         entity_dict = {
   290         entity_dict = {
   229            "indice_start": ind["indices"][0],
   291            "indice_start"   : ind["indices"][0],
   230            "indice_end"  : ind["indices"][1],
   292            "indice_end"     : ind["indices"][1],
   231            "tweet_id"    : self.tweet.id,
   293            "tweet_id"       : self.tweet.id,
       
   294            "entity_type_id" : entity_type.id,
       
   295            "source"         : adapt_json(ind)
   232         }
   296         }
   233     
   297 
       
   298         def process_medias():
       
   299             
       
   300             media_id = ind.get('id', None)
       
   301             if media_id is None:
       
   302                 return None, None
       
   303             
       
   304             type_str = ind.get("type", "photo")
       
   305             media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False)
       
   306             media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"])
       
   307             if "type" in media_ind:
       
   308                 del(media_ind["type"])
       
   309             media_ind['type_id'] = media_type.id            
       
   310             media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False)
       
   311             
       
   312             entity_dict['media_id'] = media.id
       
   313             return EntityMedia, entity_dict
       
   314 
   234         def process_hashtags():
   315         def process_hashtags():
   235             text = ind.get("text", ind.get("hashtag", None))
   316             text = ind.get("text", ind.get("hashtag", None))
   236             if text is None:
   317             if text is None:
   237                 return None
   318                 return None, None
   238             hashtag = self.obj_buffer.get(Hashtag, text=text)
   319             ind['text'] = text
   239             if hashtag is None: 
   320             hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False)
   240                 hashtag_obj = self.session.query(Hashtag).filter(Hashtag.text.ilike(text)).first()
       
   241                 if hashtag_obj is not None:
       
   242                     hashtag = ObjectBufferProxy(Hashtag, None, None, False, hashtag_obj)
       
   243                     
       
   244             if hashtag is None:
       
   245                 ind["text"] = text
       
   246                 hashtag = self.obj_buffer.add_object(Hashtag, None, ind, True)
       
   247             entity_dict['hashtag_id'] = hashtag.id
   321             entity_dict['hashtag_id'] = hashtag.id
   248             return EntityHashtag, entity_dict             
   322             return EntityHashtag, entity_dict             
   249         
   323         
   250         def process_user_mentions():
   324         def process_user_mentions():
   251             user_mention = self.__get_user(ind)
   325             user_mention = self.__get_user(ind, False, False)
   252             if user_mention is None:
   326             if user_mention is None:
   253                 entity_dict['user_id'] = None
   327                 entity_dict['user_id'] = None
   254             else:
   328             else:
   255                 entity_dict['user_id'] = user_mention.id
   329                 entity_dict['user_id'] = user_mention.id
   256             return EntityUser, entity_dict
   330             return EntityUser, entity_dict
   257         
   331         
   258         def process_urls():
   332         def process_urls():
   259             url = self.obj_buffer.get(Url, url=ind["url"])
   333             url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False)
   260             if url is None:
       
   261                 url_obj = self.session.query(Url).filter(Url.url == ind["url"]).first()
       
   262                 if url_obj is not None:
       
   263                     url = ObjectBufferProxy(Url, None, None, False, url_obj)
       
   264             if url is None:
       
   265                 url = self.obj_buffer.add_object(Url, None, ind, True)
       
   266             entity_dict['url_id'] = url.id
   334             entity_dict['url_id'] = url.id
   267             return EntityUrl, entity_dict
   335             return EntityUrl, entity_dict
   268         
   336                 
   269         #{'': lambda }
   337         #{'': lambda }
   270         entity_klass, entity_dict =  { 
   338         entity_klass, entity_dict =  { 
   271             'hashtags': process_hashtags,
   339             'hashtags': process_hashtags,
   272             'user_mentions' : process_user_mentions,
   340             'user_mentions' : process_user_mentions,
   273             'urls' : process_urls
   341             'urls' : process_urls,
   274             }[ind_type]()
   342             'media': process_medias,
   275             
   343             }.get(ind_type, lambda: (Entity, entity_dict))()
   276         logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
   344             
       
   345         get_logger().debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
   277         if entity_klass:
   346         if entity_klass:
   278             self.obj_buffer.add_object(entity_klass, None, entity_dict, False)
   347             self.obj_buffer.add_object(entity_klass, None, entity_dict, False)
   279 
   348 
   280 
   349 
   281     def __process_twitter_stream(self):
   350     def __process_twitter_stream(self):
   285             return
   354             return
   286         
   355         
   287         ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
   356         ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
   288         
   357         
   289         # get or create user
   358         # get or create user
   290         user = self.__get_user(self.json_dict["user"])
   359         user = self.__get_user(self.json_dict["user"], True)
   291         if user is None:
   360         if user is None:
   292             logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
   361             get_logger().warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
   293             ts_copy["user_id"] = None
   362             ts_copy["user_id"] = None
   294         else:
   363         else:
   295             ts_copy["user_id"] = user.id
   364             ts_copy["user_id"] = user.id
   296             
   365             
   297         del(ts_copy['user'])
   366         del(ts_copy['user'])
   298         ts_copy["tweet_source_id"] = self.source_id
   367         ts_copy["tweet_source_id"] = self.source_id
   299         
   368         
   300         self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True)
   369         self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True)
   301             
   370             
   302         # get entities
   371         self.__process_entities()
       
   372 
       
   373 
       
   374     def __process_entities(self):
   303         if "entities" in self.json_dict:
   375         if "entities" in self.json_dict:
   304             for ind_type, entity_list in self.json_dict["entities"].items():
   376             for ind_type, entity_list in self.json_dict["entities"].items():
   305                 for ind in entity_list:
   377                 for ind in entity_list:
   306                     self.__process_entity(ind, ind_type)
   378                     self.__process_entity(ind, ind_type)
   307         else:
   379         else:
   308             extractor = twitter_text.Extractor(self.tweet.text)
   380             
   309     
   381             text = self.tweet.text
       
   382             extractor = twitter_text.Extractor(text)
   310             for ind in extractor.extract_hashtags_with_indices():
   383             for ind in extractor.extract_hashtags_with_indices():
   311                 self.__process_entity(ind, "hashtags")
   384                 self.__process_entity(ind, "hashtags")
   312     
   385             
       
   386             for ind in extractor.extract_urls_with_indices():
       
   387                 self.__process_entity(ind, "urls")
       
   388             
   313             for ind in extractor.extract_mentioned_screen_names_with_indices():
   389             for ind in extractor.extract_mentioned_screen_names_with_indices():
   314                 self.__process_entity(ind, "user_mentions")
   390                 self.__process_entity(ind, "user_mentions")
   315     
       
   316             for ind in extractor.extract_urls_with_indices():
       
   317                 self.__process_entity(ind, "urls")
       
   318 
       
   319         self.session.flush()
       
   320 
       
   321 
   391 
   322     def __process_twitter_rest(self):
   392     def __process_twitter_rest(self):
   323         tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
   393         tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
   324         if tweet_nb > 0:
   394         if tweet_nb > 0:
   325             return
   395             return
   346             'lang' : self.json_dict.get('iso_language_code',None),
   416             'lang' : self.json_dict.get('iso_language_code',None),
   347             'profile_image_url' : self.json_dict["profile_image_url"],
   417             'profile_image_url' : self.json_dict["profile_image_url"],
   348             'screen_name' : self.json_dict["from_user"],                   
   418             'screen_name' : self.json_dict["from_user"],                   
   349         }
   419         }
   350         
   420         
   351         user = self.__get_user(user_fields)
   421         user = self.__get_user(user_fields, do_merge=False)
   352         if user is None:
   422         if user is None:
   353             logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
   423             get_logger().warning("USER not found " + repr(user_fields)) #@UndefinedVariable
   354             tweet_fields["user_id"] = None
   424             tweet_fields["user_id"] = None
   355         else:
   425         else:
   356             tweet_fields["user_id"] = user.id
   426             tweet_fields["user_id"] = user.id
   357         
   427         
   358         tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
   428         tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
   359         self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True)
   429         self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True)
   360         
   430                 
   361         text = self.tweet.text
   431         self.__process_entities()
   362         
       
   363         extractor = twitter_text.Extractor(text)
       
   364         
       
   365         for ind in extractor.extract_hashtags_with_indices():
       
   366             self.__process_entity(ind, "hashtags")
       
   367                     
       
   368         for ind in extractor.extract_urls_with_indices():
       
   369             self.__process_entity(ind, "urls")
       
   370         
       
   371         for ind in extractor.extract_mentioned_screen_names_with_indices():
       
   372             self.__process_entity(ind, "user_mentions")
       
   373 
   432 
   374 
   433 
   375 
   434 
   376     def process(self):
   435     def process(self):
   377         
   436         
   382         if "metadata" in self.json_dict:
   441         if "metadata" in self.json_dict:
   383             self.__process_twitter_rest()
   442             self.__process_twitter_rest()
   384         else:
   443         else:
   385             self.__process_twitter_stream()
   444             self.__process_twitter_stream()
   386 
   445 
   387         self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, False)
   446         self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, True)
   388         
   447         
   389         self.obj_buffer.persists(self.session)
   448         self.obj_buffer.persists(self.session)
   390 
   449 
   391 
   450 
   392 def set_logging(options, plogger=None):
   451 def set_logging(options, plogger=None):
   403     else:
   462     else:
   404         logging_config["filename"] = options.logfile
   463         logging_config["filename"] = options.logfile
   405             
   464             
   406     logger = plogger
   465     logger = plogger
   407     if logger is None:
   466     if logger is None:
   408         logger = logging.getLogger() #@UndefinedVariable
   467         logger = get_logger() #@UndefinedVariable
   409     
   468     
   410     if len(logger.handlers) == 0:    
   469     if len(logger.handlers) == 0:    
   411         filename = logging_config.get("filename")
   470         filename = logging_config.get("filename")
   412         if filename:
   471         if filename:
   413             mode = logging_config.get("filemode", 'a')
   472             mode = logging_config.get("filemode", 'a')
   475         
   534         
   476         query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable
   535         query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable
   477     
   536     
   478     return query.distinct()
   537     return query.distinct()
   479 
   538 
   480 logger = logging.getLogger() #@UndefinedVariable
   539 def get_logger():
       
   540     return logging.getLogger("iri_tweet") #@UndefinedVariable