script/lib/iri_tweet/utils.py
changeset 243 9213a63fa34a
parent 242 cdd7d3c0549c
child 244 d4b7d6e2633f
equal deleted inserted replaced
242:cdd7d3c0549c 243:9213a63fa34a
    75         else:
    75         else:
    76             return value
    76             return value
    77     return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])    
    77     return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])    
    78 
    78 
    79 
    79 
       
    80 class ObjectBufferProxy(object):
       
    81     def __init__(self, klass, args, kwargs, must_flush):
       
    82         self.klass= klass
       
    83         self.args = args
       
    84         self.kwargs = kwargs
       
    85         self.must_flush = must_flush
       
    86         self.instance = None
       
    87         
       
    88     def persists(self, session):
       
    89         new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
       
    90         new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {}
       
    91         
       
    92         self.instance = self.klass(*new_args, **new_kwargs)
       
    93         session.add(self.instance)
       
    94         if self.must_flush:
       
    95             session.flush()
       
    96             
       
    97     def __getattr__(self, name):
       
    98         return lambda : getattr(self.instance, name) if self.instance else None
       
    99         
       
   100         
       
   101     
       
   102 
       
   103 class ObjectsBuffer(object):
       
   104 
       
   105     def __init__(self):
       
   106         self.__bufferlist = []
       
   107         
       
   108     def persists(self, session):
       
   109         for object_proxy in self.__bufferlist:
       
   110             object_proxy.persists(session)
       
   111             
       
   112     def add_object(self, klass, args, kwargs, must_flush):
       
   113         new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush)
       
   114         self.__bufferlist.append(new_proxy)
       
   115         return new_proxy 
       
   116     
       
   117         
       
   118 
    80 
   119 
    81 class TwitterProcessorException(Exception):
   120 class TwitterProcessorException(Exception):
    82     pass
   121     pass
    83 
   122 
    84 class TwitterProcessor(object):
   123 class TwitterProcessor(object):
   102             raise TwitterProcessorException("No id in json")
   141             raise TwitterProcessorException("No id in json")
   103         
   142         
   104         self.source_id = source_id
   143         self.source_id = source_id
   105         self.session = session
   144         self.session = session
   106         self.token_filename = token_filename
   145         self.token_filename = token_filename
       
   146         self.obj_buffer = ObjectsBuffer()
   107 
   147 
   108     def __get_user(self, user_dict):
   148     def __get_user(self, user_dict):
   109         logging.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
   149         logger.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
   110     
   150     
   111         user_id = user_dict.get("id",None)    
   151         user_id = user_dict.get("id",None)    
   112         user_name = user_dict.get("screen_name", user_dict.get("name", None))
   152         user_name = user_dict.get("screen_name", user_dict.get("name", None))
   113         
   153         
   114         if user_id is None and user_name is None:
   154         if user_id is None and user_name is None:
   131                 if user_id:
   171                 if user_id:
   132                     user_dict = t.users.show(user_id=user_id)
   172                     user_dict = t.users.show(user_id=user_id)
   133                 else:
   173                 else:
   134                     user_dict = t.users.show(screen_name=user_name)            
   174                     user_dict = t.users.show(screen_name=user_name)            
   135             except Exception as e:
   175             except Exception as e:
   136                 logging.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
   176                 logger.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
   137                 logging.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
   177                 logger.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
   138     
   178     
   139         user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
   179         user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
   140         if "id" not in user_dict:
   180         if "id" not in user_dict:
   141             return None
   181             return None
   142         
   182         
   146         self.session.flush()
   186         self.session.flush()
   147         
   187         
   148         return user 
   188         return user 
   149 
   189 
   150     def __process_entity(self, ind, ind_type):
   190     def __process_entity(self, ind, ind_type):
   151         logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
   191         logger.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
   152         
   192         
   153         ind = clean_keys(ind)
   193         ind = clean_keys(ind)
   154         
   194         
   155         entity_dict = {
   195         entity_dict = {
   156            "indice_start": ind["indices"][0],
   196            "indice_start": ind["indices"][0],
   201             'hashtags': process_hashtags,
   241             'hashtags': process_hashtags,
   202             'user_mentions' : process_user_mentions,
   242             'user_mentions' : process_user_mentions,
   203             'urls' : process_urls
   243             'urls' : process_urls
   204             }[ind_type]()
   244             }[ind_type]()
   205             
   245             
   206         logging.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
   246         logger.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
   207         if entity:
   247         if entity:
   208             self.session.add(entity)
   248             self.session.add(entity)
   209             self.session.flush()
   249             self.session.flush()
   210 
   250 
   211 
   251 
   218         ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
   258         ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
   219         
   259         
   220         # get or create user
   260         # get or create user
   221         user = self.__get_user(self.json_dict["user"])
   261         user = self.__get_user(self.json_dict["user"])
   222         if user is None:
   262         if user is None:
   223             logging.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
   263             logger.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
   224             ts_copy["user"] = None
   264             ts_copy["user"] = None
   225             ts_copy["user_id"] = None
   265             ts_copy["user_id"] = None
   226         else:
   266         else:
   227             ts_copy["user"] = user
   267             ts_copy["user"] = user
   228             ts_copy["user_id"] = ts_copy["user"].id
   268             ts_copy["user_id"] = ts_copy["user"].id
   268             'screen_name' : self.json_dict["from_user"],                   
   308             'screen_name' : self.json_dict["from_user"],                   
   269         }
   309         }
   270         
   310         
   271         user = self.__get_user(user_fields)
   311         user = self.__get_user(user_fields)
   272         if user is None:
   312         if user is None:
   273             logging.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
   313             logger.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
   274             tweet_fields["user"] = None
   314             tweet_fields["user"] = None
   275             tweet_fields["user_id"] = None
   315             tweet_fields["user_id"] = None
   276         else:
   316         else:
   277             tweet_fields["user"] = user
   317             tweet_fields["user"] = user
   278             tweet_fields["user_id"] = user.id
   318             tweet_fields["user_id"] = user.id
   285         
   325         
   286         extractor = twitter_text.Extractor(text)
   326         extractor = twitter_text.Extractor(text)
   287         
   327         
   288         for ind in extractor.extract_hashtags_with_indices():
   328         for ind in extractor.extract_hashtags_with_indices():
   289             self.__process_entity(ind, "hashtags")
   329             self.__process_entity(ind, "hashtags")
   290             
   330                     
       
   331         for ind in extractor.extract_urls_with_indices():
       
   332             self.__process_entity(ind, "urls")
       
   333         
   291         for ind in extractor.extract_mentioned_screen_names_with_indices():
   334         for ind in extractor.extract_mentioned_screen_names_with_indices():
   292             self.__process_entity(ind, "user_mentions")
   335             self.__process_entity(ind, "user_mentions")
   293         
   336 
   294         for ind in extractor.extract_urls_with_indices():
       
   295             self.__process_entity(ind, "urls")
       
   296         
       
   297         self.session.flush()
       
   298 
   337 
   299 
   338 
   300     def process(self):
   339     def process(self):
   301         
   340         
   302         if self.source_id is None:
   341         if self.source_id is None:
   303             tweet_source = TweetSource(original_json=self.json_txt);
   342             tweet_source = TweetSource(original_json=self.json_txt);
   304             self.session.add(tweet_source)
   343             self.session.add(tweet_source)
   305             self.session.flush()
   344             self.session.flush()
   306             self.source_id = tweet_source.id
   345             self.source_id = tweet_source.id
   307         
   346         
   308         try:
   347         if "metadata" in self.json_dict:
   309             if "metadata" in self.json_dict:
   348             self.__process_twitter_rest()
   310                 self.__process_twitter_rest()
   349         else:
   311             else:
   350             self.__process_twitter_stream()
   312                 self.__process_twitter_stream()
   351 
   313                 
   352         tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK'])            
   314             tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK'])
   353         self.session.add(tweet_log)
   315         except:
   354         
   316             
   355 
   317             raise
   356 def set_logging(options, plogger=None):
   318         
   357     
   319 
   358     logging_config = {
   320 def set_logging(options):
   359         "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s',
   321     
   360         "level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable
   322     logging_config = {}
   361     }
   323     
   362     
   324     if options.logfile == "stdout":
   363     if options.logfile == "stdout":
   325         logging_config["stream"] = sys.stdout
   364         logging_config["stream"] = sys.stdout
   326     elif options.logfile == "stderr":
   365     elif options.logfile == "stderr":
   327         logging_config["stream"] = sys.stderr
   366         logging_config["stream"] = sys.stderr
   328     else:
   367     else:
   329         logging_config["filename"] = options.logfile
   368         logging_config["filename"] = options.logfile
   330         
   369             
   331     logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) #@UndefinedVariable
   370     logger = plogger
   332     logging.basicConfig(**logging_config) #@UndefinedVariable
   371     if logger is None:
       
   372         logger = logging.getLogger() #@UndefinedVariable
       
   373     
       
   374     if len(logger.handlers) == 0:    
       
   375         filename = logging_config.get("filename")
       
   376         if filename:
       
   377             mode = logging_config.get("filemode", 'a')
       
   378             hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable
       
   379         else:
       
   380             stream = logging_config.get("stream")
       
   381             hdlr = logging.StreamHandler(stream) #@UndefinedVariable
       
   382         fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable
       
   383         dfs = logging_config.get("datefmt", None)
       
   384         fmt = logging.Formatter(fs, dfs) #@UndefinedVariable
       
   385         hdlr.setFormatter(fmt)
       
   386         logger.addHandler(hdlr)
       
   387         level = logging_config.get("level")
       
   388         if level is not None:
       
   389             logger.setLevel(level)
   333     
   390     
   334     options.debug = (options.verbose-options.quiet > 0)
   391     options.debug = (options.verbose-options.quiet > 0)
   335 
   392 
   336 def set_logging_options(parser):
   393 def set_logging_options(parser):
   337     parser.add_option("-l", "--log", dest="logfile",
   394     parser.add_option("-l", "--log", dest="logfile",
   382         
   439         
   383         query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable
   440         query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable
   384     
   441     
   385     return query.distinct()
   442     return query.distinct()
   386 
   443 
   387     
   444 logger = logging.getLogger() #@UndefinedVariable