script/lib/iri_tweet/iri_tweet/utils.py
changeset 529 99215db3da25
parent 464 b9243ade95e2
child 693 2ef837069108
child 745 ed762d500c7e
equal deleted inserted replaced
528:7fb5a7b0d35c 529:99215db3da25
       
     1 from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, 
       
     2     EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, 
       
     3     ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType, 
       
     4     Media, EntityMedia, Entity, EntityType)
       
     5 from sqlalchemy.sql import select, or_ #@UnresolvedImport
       
     6 import Queue #@UnresolvedImport
       
     7 import anyjson #@UnresolvedImport
       
     8 import codecs
       
     9 import datetime
       
    10 import email.utils
       
    11 import logging
       
    12 import math
       
    13 import os.path
       
    14 import sys
       
    15 import twitter.oauth #@UnresolvedImport
       
    16 import twitter.oauth_dance #@UnresolvedImport
       
    17 import twitter_text #@UnresolvedImport
       
    18 
       
    19 
       
    20 CACHE_ACCESS_TOKEN = {}
       
    21 
       
    22 def get_oauth_token(token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET):
       
    23     
       
    24     global CACHE_ACCESS_TOKEN
       
    25 
       
    26     if 'ACCESS_TOKEN_KEY' in globals() and 'ACCESS_TOKEN_SECRET' in globals() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
       
    27         return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
       
    28     
       
    29     res = CACHE_ACCESS_TOKEN.get(application_name, None)
       
    30     
       
    31     if res is None and token_file_path and os.path.exists(token_file_path):
       
    32         get_logger().debug("get_oauth_token : reading token from file %s" % token_file_path) #@UndefinedVariable
       
    33         res = twitter.oauth.read_token_file(token_file_path)
       
    34     
       
    35     if res is not None and check_access_token:
       
    36         get_logger().debug("get_oauth_token : Check oauth tokens") #@UndefinedVariable
       
    37         t = twitter.Twitter(auth=twitter.OAuth(res[0], res[1], CONSUMER_KEY, CONSUMER_SECRET))
       
    38         status = None
       
    39         try:
       
    40             status = t.account.rate_limit_status()
       
    41         except Exception as e:
       
    42             get_logger().debug("get_oauth_token : error getting rate limit status %s" % repr(e))
       
    43             status = None
       
    44         get_logger().debug("get_oauth_token : Check oauth tokens : status %s" % repr(status)) #@UndefinedVariable
       
    45         if status is None or status['remaining_hits'] == 0:
       
    46             get_logger().debug("get_oauth_token : Problem with status %s" % repr(status))
       
    47             res = None
       
    48 
       
    49     if res is None:
       
    50         get_logger().debug("get_oauth_token : doing the oauth dance")
       
    51         res = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
       
    52     
       
    53     CACHE_ACCESS_TOKEN[application_name] = res
       
    54     
       
    55     return res
       
    56 
       
    57 def parse_date(date_str):
       
    58     ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable
       
    59     return datetime.datetime(*ts[0:7])
       
    60 
       
    61 def clean_keys(dict_val):
       
    62     return dict([(str(key),value) for key,value in dict_val.items()])
       
    63 
       
    64 fields_adapter = {
       
    65     'stream': {
       
    66         "tweet": {
       
    67             "created_at"    : adapt_date,
       
    68             "coordinates"   : adapt_json,
       
    69             "place"         : adapt_json,
       
    70             "geo"           : adapt_json,
       
    71 #            "original_json" : adapt_json,
       
    72         },
       
    73         "user": {
       
    74             "created_at"  : adapt_date,
       
    75         },
       
    76 
       
    77     },
       
    78                   
       
    79     'entities' : {
       
    80         "medias": {
       
    81             "sizes"  : adapt_json,
       
    82         },                  
       
    83     },
       
    84     'rest': {
       
    85         "tweet" : {
       
    86             "place"         : adapt_json,
       
    87             "geo"           : adapt_json,
       
    88             "created_at"    : adapt_date,
       
    89 #            "original_json" : adapt_json,
       
    90         }, 
       
    91     },
       
    92 }
       
    93 
       
    94 #
       
    95 # adapt fields, return a copy of the field_dict with adapted fields
       
    96 #
       
    97 def adapt_fields(fields_dict, adapter_mapping):
       
    98     def adapt_one_field(field, value):
       
    99         if field in adapter_mapping and adapter_mapping[field] is not None:
       
   100             return adapter_mapping[field](value)
       
   101         else:
       
   102             return value
       
   103     return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])    
       
   104 
       
   105 
       
   106 class ObjectBufferProxy(object):
       
   107     def __init__(self, klass, args, kwargs, must_flush, instance=None):
       
   108         self.klass= klass
       
   109         self.args = args
       
   110         self.kwargs = kwargs
       
   111         self.must_flush = must_flush
       
   112         self.instance = instance
       
   113         
       
   114     def persists(self, session):
       
   115         new_args = [arg() if callable(arg) else arg for arg in self.args] if self.args is not None else []
       
   116         new_kwargs = dict([(k,v()) if callable(v) else (k,v) for k,v in self.kwargs.items()]) if self.kwargs is not None else {}
       
   117         
       
   118         if self.instance is None:
       
   119             self.instance = self.klass(*new_args, **new_kwargs)
       
   120         else:
       
   121             self.instance = self.klass(*new_args, **new_kwargs)
       
   122             self.instance = session.merge(self.instance)
       
   123 
       
   124         session.add(self.instance)
       
   125         if self.must_flush:
       
   126             session.flush()
       
   127             
       
   128     def __getattr__(self, name):
       
   129         return lambda : getattr(self.instance, name) if self.instance else None
       
   130         
       
   131         
       
   132     
       
   133 
       
   134 class ObjectsBuffer(object):
       
   135 
       
   136     def __init__(self):
       
   137         self.__bufferlist = []
       
   138         self.__bufferdict = {}
       
   139     
       
   140     def __add_proxy_object(self, proxy):
       
   141         proxy_list =  self.__bufferdict.get(proxy.klass, None)
       
   142         if proxy_list is None:
       
   143             proxy_list = []
       
   144             self.__bufferdict[proxy.klass] = proxy_list
       
   145         proxy_list.append(proxy)
       
   146         self.__bufferlist.append(proxy)
       
   147         
       
   148     def persists(self, session):
       
   149         for object_proxy in self.__bufferlist:
       
   150             object_proxy.persists(session)
       
   151                 
       
   152     def add_object(self, klass, args, kwargs, must_flush, instance=None):
       
   153         new_proxy = ObjectBufferProxy(klass, args, kwargs, must_flush, instance)
       
   154         self.__add_proxy_object(new_proxy)
       
   155         return new_proxy 
       
   156     
       
   157     def get(self, klass, **kwargs):
       
   158         if klass in self.__bufferdict:
       
   159             for proxy in self.__bufferdict[klass]:
       
   160                 if proxy.kwargs is None or len(proxy.kwargs) == 0 or proxy.klass != klass:
       
   161                     continue
       
   162                 found = True
       
   163                 for k,v in kwargs.items():
       
   164                     if (k not in proxy.kwargs) or v != proxy.kwargs[k]:
       
   165                         found = False
       
   166                         break
       
   167                 if found:
       
   168                     return proxy        
       
   169         return None
       
   170                 
       
   171 class TwitterProcessorException(Exception):
       
   172     pass
       
   173 
       
   174 class TwitterProcessor(object):
       
   175     
       
   176     def __init__(self, json_dict, json_txt, source_id, session, access_token=None, token_filename=None, user_query_twitter=False):
       
   177 
       
   178         if json_dict is None and json_txt is None:
       
   179             raise TwitterProcessorException("No json")
       
   180         
       
   181         if json_dict is None:
       
   182             self.json_dict = anyjson.deserialize(json_txt)
       
   183         else:
       
   184             self.json_dict = json_dict
       
   185         
       
   186         if not json_txt:
       
   187             self.json_txt = anyjson.serialize(json_dict)
       
   188         else:
       
   189             self.json_txt = json_txt
       
   190         
       
   191         if "id" not in self.json_dict:
       
   192             raise TwitterProcessorException("No id in json")
       
   193         
       
   194         self.source_id = source_id
       
   195         self.session = session
       
   196         self.token_filename = token_filename
       
   197         self.access_token = access_token
       
   198         self.obj_buffer = ObjectsBuffer()
       
   199         self.user_query_twitter = user_query_twitter  
       
   200         
       
   201 
       
   202 
       
   203     def __get_user(self, user_dict, do_merge):
       
   204         get_logger().debug("Get user : " + repr(user_dict)) #@UndefinedVariable
       
   205         
       
   206         user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
       
   207     
       
   208         user_id = user_dict.get("id",None)    
       
   209         user_name = user_dict.get("screen_name", user_dict.get("name", None))
       
   210         
       
   211         if user_id is None and user_name is None:
       
   212             return None
       
   213 
       
   214         user = None
       
   215         if user_id:
       
   216             user = self.obj_buffer.get(User, id=user_id)
       
   217         else:
       
   218             user = self.obj_buffer.get(User, screen_name=user_name)
       
   219 
       
   220         #to do update user id needed            
       
   221         if user is not None:
       
   222             user_created_at = None
       
   223             if user.args is not None:
       
   224                 user_created_at = user.args.get('created_at', None)
       
   225             if user_created_at is None and user_dict.get('created_at', None) is not None and do_merge:
       
   226                 if user.args is None:
       
   227                     user.args = user_dict
       
   228                 else:
       
   229                     user.args.update(user_dict)
       
   230             return user
       
   231 
       
   232         #todo : add methpds to objectbuffer to get buffer user
       
   233         user_obj = None
       
   234         if user_id:
       
   235             user_obj = self.session.query(User).filter(User.id == user_id).first()
       
   236         else:
       
   237             user_obj = self.session.query(User).filter(User.screen_name.ilike(user_name)).first()
       
   238     
       
   239         #todo update user if needed
       
   240         if user_obj is not None:            
       
   241             if user_obj.created_at is not None or user_dict.get('created_at', None) is None or not do_merge :
       
   242                 user = ObjectBufferProxy(User, None, None, False, user_obj)
       
   243             else:
       
   244                 user = self.obj_buffer.add_object(User, None, user_dict, True, user_obj)
       
   245             return user
       
   246     
       
   247         user_created_at = user_dict.get("created_at", None)
       
   248         
       
   249         if user_created_at is None and self.user_query_twitter:
       
   250             
       
   251             if self.access_token is not None:
       
   252                 acess_token_key, access_token_secret = self.access_token
       
   253             else:
       
   254                 acess_token_key, access_token_secret = get_oauth_token(self.token_filename)
       
   255             t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET))
       
   256             try:
       
   257                 if user_id:
       
   258                     user_dict = t.users.show(user_id=user_id)
       
   259                 else:
       
   260                     user_dict = t.users.show(screen_name=user_name)            
       
   261             except Exception as e:
       
   262                 get_logger().info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
       
   263                 get_logger().info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
       
   264                 return None
       
   265             
       
   266         if "id" not in user_dict:
       
   267             return None
       
   268         
       
   269         #TODO filter get, wrap in proxy
       
   270         user_obj = self.session.query(User).filter(User.id == user_dict["id"]).first()
       
   271         
       
   272         if user_obj is not None and not do_merge:
       
   273             return ObjectBufferProxy(User, None, None, False, user_obj)
       
   274         else:        
       
   275             return self.obj_buffer.add_object(User, None, user_dict, True)        
       
   276 
       
   277     def __get_or_create_object(self, klass, filter_by_kwargs, filter, creation_kwargs, must_flush, do_merge):
       
   278         
       
   279         obj_proxy = self.obj_buffer.get(klass, **filter_by_kwargs)
       
   280         if obj_proxy is None:
       
   281             query = self.session.query(klass)
       
   282             if filter is not None:
       
   283                 query = query.filter(filter)
       
   284             else:
       
   285                 query = query.filter_by(**filter_by_kwargs)
       
   286             obj_instance = query.first()
       
   287             if obj_instance is not None:
       
   288                 if not do_merge:
       
   289                     obj_proxy = ObjectBufferProxy(klass, None, None, False, obj_instance)
       
   290                 else:
       
   291                     obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush, obj_instance)
       
   292         if obj_proxy is None:
       
   293             obj_proxy = self.obj_buffer.add_object(klass, None, creation_kwargs, must_flush)
       
   294         return obj_proxy
       
   295 
       
   296 
       
   297     def __process_entity(self, ind, ind_type):
       
   298         get_logger().debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
       
   299         
       
   300         ind = clean_keys(ind)
       
   301         
       
   302         entity_type = self.__get_or_create_object(EntityType, {'label':ind_type}, None, {'label':ind_type}, True, False)
       
   303         
       
   304         entity_dict = {
       
   305            "indice_start"   : ind["indices"][0],
       
   306            "indice_end"     : ind["indices"][1],
       
   307            "tweet_id"       : self.tweet.id,
       
   308            "entity_type_id" : entity_type.id,
       
   309            "source"         : adapt_json(ind)
       
   310         }
       
   311 
       
   312         def process_medias():
       
   313             
       
   314             media_id = ind.get('id', None)
       
   315             if media_id is None:
       
   316                 return None, None
       
   317             
       
   318             type_str = ind.get("type", "photo")
       
   319             media_type = self.__get_or_create_object(MediaType, {'label': type_str}, None, {'label':type_str}, True, False)
       
   320             media_ind = adapt_fields(ind, fields_adapter["entities"]["medias"])
       
   321             if "type" in media_ind:
       
   322                 del(media_ind["type"])
       
   323             media_ind['type_id'] = media_type.id            
       
   324             media = self.__get_or_create_object(Media, {'id':media_id}, None, media_ind, True, False)
       
   325             
       
   326             entity_dict['media_id'] = media.id
       
   327             return EntityMedia, entity_dict
       
   328 
       
   329         def process_hashtags():
       
   330             text = ind.get("text", ind.get("hashtag", None))
       
   331             if text is None:
       
   332                 return None, None
       
   333             ind['text'] = text
       
   334             hashtag = self.__get_or_create_object(Hashtag, {'text':text}, Hashtag.text.ilike(text), ind, True, False)
       
   335             entity_dict['hashtag_id'] = hashtag.id
       
   336             return EntityHashtag, entity_dict             
       
   337         
       
   338         def process_user_mentions():
       
   339             user_mention = self.__get_user(ind, False)
       
   340             if user_mention is None:
       
   341                 entity_dict['user_id'] = None
       
   342             else:
       
   343                 entity_dict['user_id'] = user_mention.id
       
   344             return EntityUser, entity_dict
       
   345         
       
   346         def process_urls():
       
   347             url = self.__get_or_create_object(Url, {'url':ind["url"]}, None, ind, True, False)
       
   348             entity_dict['url_id'] = url.id
       
   349             return EntityUrl, entity_dict
       
   350                 
       
   351         #{'': lambda }
       
   352         entity_klass, entity_dict =  { 
       
   353             'hashtags': process_hashtags,
       
   354             'user_mentions' : process_user_mentions,
       
   355             'urls' : process_urls,
       
   356             'media': process_medias,
       
   357             }.get(ind_type, lambda: (Entity, entity_dict))()
       
   358             
       
   359         get_logger().debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
       
   360         if entity_klass:
       
   361             self.obj_buffer.add_object(entity_klass, None, entity_dict, False)
       
   362 
       
   363 
       
   364     def __process_twitter_stream(self):
       
   365         
       
   366         tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
       
   367         if tweet_nb > 0:
       
   368             return
       
   369         
       
   370         ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
       
   371         
       
   372         # get or create user
       
   373         user = self.__get_user(self.json_dict["user"], True)
       
   374         if user is None:
       
   375             get_logger().warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
       
   376             ts_copy["user_id"] = None
       
   377         else:
       
   378             ts_copy["user_id"] = user.id
       
   379             
       
   380         del(ts_copy['user'])
       
   381         ts_copy["tweet_source_id"] = self.source_id
       
   382         
       
   383         self.tweet = self.obj_buffer.add_object(Tweet, None, ts_copy, True)
       
   384             
       
   385         self.__process_entities()
       
   386 
       
   387 
       
   388     def __process_entities(self):
       
   389         if "entities" in self.json_dict:
       
   390             for ind_type, entity_list in self.json_dict["entities"].items():
       
   391                 for ind in entity_list:
       
   392                     self.__process_entity(ind, ind_type)
       
   393         else:
       
   394             
       
   395             text = self.tweet.text
       
   396             extractor = twitter_text.Extractor(text)
       
   397             for ind in extractor.extract_hashtags_with_indices():
       
   398                 self.__process_entity(ind, "hashtags")
       
   399             
       
   400             for ind in extractor.extract_urls_with_indices():
       
   401                 self.__process_entity(ind, "urls")
       
   402             
       
   403             for ind in extractor.extract_mentioned_screen_names_with_indices():
       
   404                 self.__process_entity(ind, "user_mentions")
       
   405 
       
   406     def __process_twitter_rest(self):
       
   407         tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
       
   408         if tweet_nb > 0:
       
   409             return
       
   410         
       
   411         
       
   412         tweet_fields = {
       
   413             'created_at': self.json_dict["created_at"], 
       
   414             'favorited': False,
       
   415             'id': self.json_dict["id"],
       
   416             'id_str': self.json_dict["id_str"],
       
   417             #'in_reply_to_screen_name': ts["to_user"], 
       
   418             'in_reply_to_user_id': self.json_dict["to_user_id"],
       
   419             'in_reply_to_user_id_str': self.json_dict["to_user_id_str"],
       
   420             #'place': ts["place"],
       
   421             'source': self.json_dict["source"],
       
   422             'text': self.json_dict["text"],
       
   423             'truncated': False,            
       
   424             'tweet_source_id' : self.source_id,
       
   425         }
       
   426         
       
   427         #user
       
   428     
       
   429         user_fields = {
       
   430             'lang' : self.json_dict.get('iso_language_code',None),
       
   431             'profile_image_url' : self.json_dict["profile_image_url"],
       
   432             'screen_name' : self.json_dict["from_user"],
       
   433             'id' : self.json_dict["from_user_id"],
       
   434             'id_str' : self.json_dict["from_user_id_str"],
       
   435             'name' : self.json_dict['from_user_name'],
       
   436         }
       
   437         
       
   438         user = self.__get_user(user_fields, do_merge=False)
       
   439         if user is None:
       
   440             get_logger().warning("USER not found " + repr(user_fields)) #@UndefinedVariable
       
   441             tweet_fields["user_id"] = None
       
   442         else:
       
   443             tweet_fields["user_id"] = user.id
       
   444         
       
   445         tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
       
   446         self.tweet = self.obj_buffer.add_object(Tweet, None, tweet_fields, True)
       
   447                 
       
   448         self.__process_entities()
       
   449 
       
   450 
       
   451 
       
   452     def process(self):
       
   453         
       
   454         if self.source_id is None:
       
   455             tweet_source = self.obj_buffer.add_object(TweetSource, None, {'original_json':self.json_txt}, True)
       
   456             self.source_id = tweet_source.id
       
   457         
       
   458         if "metadata" in self.json_dict:
       
   459             self.__process_twitter_rest()
       
   460         else:
       
   461             self.__process_twitter_stream()
       
   462 
       
   463         self.obj_buffer.add_object(TweetLog, None, {'tweet_source_id':self.source_id, 'status':TweetLog.TWEET_STATUS['OK']}, True)
       
   464         
       
   465         self.obj_buffer.persists(self.session)
       
   466 
       
   467 
       
   468 def set_logging(options, plogger=None, queue=None):
       
   469     
       
   470     logging_config = {
       
   471         "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s',
       
   472         "level" : max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)), #@UndefinedVariable
       
   473     }
       
   474     
       
   475     if options.logfile == "stdout":
       
   476         logging_config["stream"] = sys.stdout
       
   477     elif options.logfile == "stderr":
       
   478         logging_config["stream"] = sys.stderr
       
   479     else:
       
   480         logging_config["filename"] = options.logfile
       
   481             
       
   482     logger = plogger
       
   483     if logger is None:
       
   484         logger = get_logger() #@UndefinedVariable
       
   485     
       
   486     if len(logger.handlers) == 0:
       
   487         filename = logging_config.get("filename")
       
   488         if queue is not None:
       
   489             hdlr = QueueHandler(queue, True)
       
   490         elif filename:
       
   491             mode = logging_config.get("filemode", 'a')
       
   492             hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable
       
   493         else:
       
   494             stream = logging_config.get("stream")
       
   495             hdlr = logging.StreamHandler(stream) #@UndefinedVariable
       
   496             
       
   497         fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable
       
   498         dfs = logging_config.get("datefmt", None)
       
   499         fmt = logging.Formatter(fs, dfs) #@UndefinedVariable
       
   500         hdlr.setFormatter(fmt)
       
   501         logger.addHandler(hdlr)
       
   502         level = logging_config.get("level")
       
   503         if level is not None:
       
   504             logger.setLevel(level)
       
   505     
       
   506     options.debug = (options.verbose-options.quiet > 0)
       
   507     return logger
       
   508 
       
   509 def set_logging_options(parser):
       
   510     parser.add_option("-l", "--log", dest="logfile",
       
   511                       help="log to file", metavar="LOG", default="stderr")
       
   512     parser.add_option("-v", dest="verbose", action="count",
       
   513                       help="verbose", metavar="VERBOSE", default=0)
       
   514     parser.add_option("-q", dest="quiet", action="count",
       
   515                       help="quiet", metavar="QUIET", default=0)
       
   516 
       
   517 def get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
       
   518     
       
   519     query = query.join(EntityHashtag).join(Hashtag)
       
   520     
       
   521     if tweet_exclude_table is not None:
       
   522         query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable
       
   523     
       
   524     if start_date:
       
   525         query = query.filter(Tweet.created_at >=  start_date)
       
   526     if end_date:
       
   527         query = query.filter(Tweet.created_at <=  end_date)
       
   528 
       
   529     if user_whitelist:
       
   530         query = query.join(User).filter(User.screen_name.in_(user_whitelist))
       
   531 
       
   532     
       
   533     if hashtags :
       
   534         def merge_hash(l,h):
       
   535             l.extend(h.split(","))
       
   536             return l
       
   537         htags = reduce(merge_hash, hashtags, [])
       
   538         
       
   539         query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable
       
   540     
       
   541     return query
       
   542 
       
   543     
       
   544     
       
   545 def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
       
   546     
       
   547     query = session.query(Tweet)
       
   548     query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist) 
       
   549     return query.order_by(Tweet.created_at)
       
   550     
       
   551 
       
   552 def get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table):
       
   553     
       
   554     query = session.query(User).join(Tweet)
       
   555     
       
   556     query = get_base_query(session, query, start_date, end_date, hashtags, tweet_exclude_table, None)    
       
   557     
       
   558     return query.distinct()
       
   559 
       
   560 logger_name = "iri.tweet"
       
   561 
       
   562 def get_logger():
       
   563     global logger_name
       
   564     return logging.getLogger(logger_name) #@UndefinedVariable
       
   565 
       
   566 
       
   567 # Next two import lines for this demo only
       
   568 
       
   569 class QueueHandler(logging.Handler): #@UndefinedVariable
       
   570     """
       
   571     This is a logging handler which sends events to a multiprocessing queue.    
       
   572     """
       
   573 
       
   574     def __init__(self, queue, ignore_full):
       
   575         """
       
   576         Initialise an instance, using the passed queue.
       
   577         """
       
   578         logging.Handler.__init__(self) #@UndefinedVariable
       
   579         self.queue = queue
       
   580         self.ignore_full = True
       
   581         
       
   582     def emit(self, record):
       
   583         """
       
   584         Emit a record.
       
   585 
       
   586         Writes the LogRecord to the queue.
       
   587         """
       
   588         try:
       
   589             ei = record.exc_info
       
   590             if ei:
       
   591                 dummy = self.format(record) # just to get traceback text into record.exc_text
       
   592                 record.exc_info = None  # not needed any more
       
   593             if not self.ignore_full or not self.queue.full():
       
   594                 self.queue.put_nowait(record)
       
   595         except Queue.Full:
       
   596             if self.ignore_full:
       
   597                 pass
       
   598             else:
       
   599                 raise
       
   600         except (KeyboardInterrupt, SystemExit):
       
   601             raise
       
   602         except:
       
   603             self.handleError(record)
       
   604 
       
   605 def show_progress(current_line, total_line, label, width, writer=None):
       
   606 
       
   607     if writer is None:
       
   608         writer = sys.stdout
       
   609         if sys.stdout.encoding is not None:
       
   610             writer = codecs.getwriter(sys.stdout.encoding)(sys.stdout)
       
   611 
       
   612     percent = (float(current_line) / float(total_line)) * 100.0
       
   613 
       
   614     marks = math.floor(width * (percent / 100.0))
       
   615     spaces = math.floor(width - marks)
       
   616 
       
   617     loader = u'[' + (u'=' * int(marks)) + (u' ' * int(spaces)) + u']'
       
   618         
       
   619     s = u"%s %3d%% %*d/%d - %*s\r" % (loader, percent, len(str(total_line)), current_line, total_line, width, label[:width])
       
   620     
       
   621     writer.write(s) #takes the header into account
       
   622     if percent >= 100:
       
   623         writer.write("\n")
       
   624     writer.flush()
       
   625     
       
   626     return writer