script/lib/iri_tweet/utils.py
changeset 98 6e8930a1b8f7
parent 82 210dc265c70f
child 122 4c3a15877f80
equal deleted inserted replaced
97:861cae17abda 98:6e8930a1b8f7
     1 from models import *
     1 from models import Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, \
     2 from sqlalchemy.sql import select, or_
     2     EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, \
     3 import anyjson
     3     ACCESS_TOKEN_SECRET, adapt_date, adapt_json
       
     4 from sqlalchemy.sql import select, or_ #@UnresolvedImport
       
     5 import anyjson #@UnresolvedImport
     4 import datetime
     6 import datetime
     5 import email.utils
     7 import email.utils
     6 import logging
     8 import logging #@UnresolvedImport
     7 import os.path
     9 import os.path
     8 import sys
    10 import sys
     9 import twitter
    11 import twitter.oauth #@UnresolvedImport
    10 import twitter.oauth
    12 import twitter.oauth_dance #@UnresolvedImport
    11 import twitter.oauth_dance
    13 import twitter_text #@UnresolvedImport
    12 import twitter_text
    14 
    13 
    15 
    14 
    16 
    15 CACHE_ACCESS_TOKEN = {}
    17 CACHE_ACCESS_TOKEN = {}
    16 
    18 
    17 def get_oauth_token(token_file_path=None, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET):
    19 def get_oauth_token(token_file_path=None, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET):
    20     
    22     
    21     if CACHE_ACCESS_TOKEN is not None and application_name in CACHE_ACCESS_TOKEN:
    23     if CACHE_ACCESS_TOKEN is not None and application_name in CACHE_ACCESS_TOKEN:
    22         return CACHE_ACCESS_TOKEN[application_name]
    24         return CACHE_ACCESS_TOKEN[application_name]
    23     
    25     
    24     if token_file_path and os.path.exists(token_file_path):
    26     if token_file_path and os.path.exists(token_file_path):
    25         logging.debug("reading token from file %s" % token_file_path)
    27         logging.debug("reading token from file %s" % token_file_path) #@UndefinedVariable
    26         CACHE_ACCESS_TOKEN[application_name] = twitter.oauth.read_token_file(token_file_path)
    28         CACHE_ACCESS_TOKEN[application_name] = twitter.oauth.read_token_file(token_file_path)
    27         return CACHE_ACCESS_TOKEN[application_name]
    29         return CACHE_ACCESS_TOKEN[application_name]
    28         #read access token info from path
    30         #read access token info from path
    29     
    31     
    30     if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
    32     if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
    32     
    34     
    33     CACHE_ACCESS_TOKEN[application_name] = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
    35     CACHE_ACCESS_TOKEN[application_name] = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
    34     return CACHE_ACCESS_TOKEN[application_name]
    36     return CACHE_ACCESS_TOKEN[application_name]
    35 
    37 
    36 def parse_date(date_str):
    38 def parse_date(date_str):
    37     ts = email.utils.parsedate_tz(date_str)
    39     ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable
    38     return datetime.datetime(*ts[0:7])
    40     return datetime.datetime(*ts[0:7])
    39 
    41 
    40 def clean_keys(dict_val):
    42 def clean_keys(dict_val):
    41     return dict([(str(key),value) for key,value in dict_val.items()])
    43     return dict([(str(key),value) for key,value in dict_val.items()])
    42 
    44 
   101         
   103         
   102         self.session = session
   104         self.session = session
   103         self.token_filename = token_filename
   105         self.token_filename = token_filename
   104 
   106 
   105     def __get_user(self, user_dict):
   107     def __get_user(self, user_dict):
   106         logging.debug("Get user : " + repr(user_dict))
   108         logging.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
   107     
   109     
   108         user_id = user_dict.get("id",None)    
   110         user_id = user_dict.get("id",None)    
   109         user_name = user_dict.get("screen_name", user_dict.get("name", None))
   111         user_name = user_dict.get("screen_name", user_dict.get("name", None))
   110         
   112         
   111         if user_id is None and user_name is None:
   113         if user_id is None and user_name is None:
   128                 if user_id:
   130                 if user_id:
   129                     user_dict = t.users.show(user_id=user_id)
   131                     user_dict = t.users.show(user_id=user_id)
   130                 else:
   132                 else:
   131                     user_dict = t.users.show(screen_name=user_name)            
   133                     user_dict = t.users.show(screen_name=user_name)            
   132             except Exception as e:
   134             except Exception as e:
   133                 logging.info("get_user : TWITTER ERROR : " + repr(e))
   135                 logging.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
   134                 logging.info("get_user : TWITTER ERROR : " + str(e))
   136                 logging.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
   135     
   137     
   136         user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
   138         user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
   137         if "id" not in user_dict:
   139         if "id" not in user_dict:
   138             return None
   140             return None
   139         
   141         
   143         self.session.flush()
   145         self.session.flush()
   144         
   146         
   145         return user 
   147         return user 
   146 
   148 
   147     def __process_entity(self, ind, ind_type):
   149     def __process_entity(self, ind, ind_type):
   148         logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type))
   150         logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
   149         
   151         
   150         ind = clean_keys(ind)
   152         ind = clean_keys(ind)
   151         
   153         
   152         entity_dict = {
   154         entity_dict = {
   153            "indice_start": ind["indices"][0],
   155            "indice_start": ind["indices"][0],
   198             'hashtags': process_hashtags,
   200             'hashtags': process_hashtags,
   199             'user_mentions' : process_user_mentions,
   201             'user_mentions' : process_user_mentions,
   200             'urls' : process_urls
   202             'urls' : process_urls
   201             }[ind_type]()
   203             }[ind_type]()
   202             
   204             
   203         logging.debug("Process_entity entity_dict: " + repr(entity_dict))
   205         logging.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
   204         if entity:
   206         if entity:
   205             self.session.add(entity)
   207             self.session.add(entity)
   206             self.session.flush()
   208             self.session.flush()
   207 
   209 
   208 
   210 
   215         ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
   217         ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
   216         
   218         
   217         # get or create user
   219         # get or create user
   218         user = self.__get_user(self.json_dict["user"])
   220         user = self.__get_user(self.json_dict["user"])
   219         if user is None:
   221         if user is None:
   220             logging.warning("USER not found " + repr(self.json_dict["user"]))
   222             logging.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
   221             ts_copy["user"] = None
   223             ts_copy["user"] = None
   222             ts_copy["user_id"] = None
   224             ts_copy["user_id"] = None
   223         else:
   225         else:
   224             ts_copy["user"] = user
   226             ts_copy["user"] = user
   225             ts_copy["user_id"] = ts_copy["user"].id
   227             ts_copy["user_id"] = ts_copy["user"].id
   263             'screen_name' : self.json_dict["from_user"],                   
   265             'screen_name' : self.json_dict["from_user"],                   
   264         }
   266         }
   265         
   267         
   266         user = self.__get_user(user_fields)
   268         user = self.__get_user(user_fields)
   267         if user is None:
   269         if user is None:
   268             logging.warning("USER not found " + repr(user_fields))
   270             logging.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
   269             tweet_fields["user"] = None
   271             tweet_fields["user"] = None
   270             tweet_fields["user_id"] = None
   272             tweet_fields["user_id"] = None
   271         else:
   273         else:
   272             tweet_fields["user"] = user
   274             tweet_fields["user"] = user
   273             tweet_fields["user_id"] = user.id
   275             tweet_fields["user_id"] = user.id
   308     elif options.logfile == "stderr":
   310     elif options.logfile == "stderr":
   309         logging_config["stream"] = sys.stderr
   311         logging_config["stream"] = sys.stderr
   310     else:
   312     else:
   311         logging_config["filename"] = options.logfile
   313         logging_config["filename"] = options.logfile
   312         
   314         
   313     logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet))
   315     logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) #@UndefinedVariable
   314     logging.basicConfig(**logging_config)
   316     logging.basicConfig(**logging_config) #@UndefinedVariable
   315     
   317     
   316     options.debug = (options.verbose-options.quiet > 0)
   318     options.debug = (options.verbose-options.quiet > 0)
   317 
   319 
   318 def set_logging_options(parser):
   320 def set_logging_options(parser):
   319     parser.add_option("-l", "--log", dest="logfile",
   321     parser.add_option("-l", "--log", dest="logfile",
   326     
   328     
   327 def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table):
   329 def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table):
   328     
   330     
   329     query = session.query(Tweet).join(EntityHashtag).join(Hashtag)
   331     query = session.query(Tweet).join(EntityHashtag).join(Hashtag)
   330     if tweet_exclude_table is not None:
   332     if tweet_exclude_table is not None:
   331         query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id])))
   333         query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable
   332         
   334         
   333     query = query.filter(Tweet.created_at >=  start_date).filter(Tweet.created_at <=  end_date)
   335     query = query.filter(Tweet.created_at >=  start_date).filter(Tweet.created_at <=  end_date)
   334     
   336     
   335     if hashtags :
   337     if hashtags :
   336         def merge_hash(l,h):
   338         def merge_hash(l,h):
   337             l.extend(h.split(","))
   339             l.extend(h.split(","))
   338             return l
   340             return l
   339         htags = reduce(merge_hash, hashtags, [])
   341         htags = reduce(merge_hash, hashtags, [])
   340         
   342         
   341         query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags)))
   343         query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable
   342     
   344     
   343     return query
   345     return query
   344     
   346     
   345 
   347 
   346 def get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table):
   348 def get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table):
   347     
   349     
   348     query = session.query(User).join(Tweet).join(EntityHashtag).join(Hashtag)
   350     query = session.query(User).join(Tweet).join(EntityHashtag).join(Hashtag)
   349     if tweet_exclude_table is not None:
   351     if tweet_exclude_table is not None:
   350         query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id])))
   352         query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable
   351         
   353         
   352     query = query.filter(Tweet.created_at >=  start_date).filter(Tweet.created_at <=  end_date)
   354     query = query.filter(Tweet.created_at >=  start_date).filter(Tweet.created_at <=  end_date)
   353     
   355     
   354     if hashtags :
   356     if hashtags :
   355         def merge_hash(l,h):
   357         def merge_hash(l,h):
   356             l.extend(h.split(","))
   358             l.extend(h.split(","))
   357             return l
   359             return l
   358         htags = reduce(merge_hash, hashtags, [])
   360         htags = reduce(merge_hash, hashtags, [])
   359         
   361         
   360         query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags)))
   362         query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable
   361     
   363     
   362     return query.distinct()
   364     return query.distinct()
   363 
   365 
   364     
   366