script/iri_tweet/utils.py
changeset 11 54d7f1486ac4
parent 9 bb44692e09ee
equal deleted inserted replaced
10:eb885a117aa0 11:54d7f1486ac4
     1 import email.utils
       
     2 import logging
       
     3 from models import *
     1 from models import *
     4 import datetime
     2 import datetime
       
     3 import email.utils
       
     4 import json
       
     5 import logging
       
     6 import sys
     5 import twitter
     7 import twitter
     6 import twitter_text
     8 import twitter_text
     7 
     9 import os.path
       
    10 import twitter.oauth
       
    11 
       
    12 
       
    13 def get_oauth_token(token_file_path=None):
       
    14     
       
    15     if token_file_path and os.path.file_exists(token_file_path):
       
    16         logging.debug("reading token from file %s" % token_file_path)
       
    17         return twitter.oauth.read_token_file(token_file_path)
       
    18         #read access token info from path
       
    19     
       
    20     if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
       
    21         return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
       
    22     
       
    23     return twitter.oauth_dance.oauth_dance(APPLICATION_NAME, CONSUMER_KEY, CONSUMER_SECRET, token_filename)
     8 
    24 
     9 def parse_date(date_str):
    25 def parse_date(date_str):
    10     ts = email.utils.parsedate_tz(date_str)
    26     ts = email.utils.parsedate_tz(date_str)
    11     return datetime.datetime(*ts[0:7])
    27     return datetime.datetime(*ts[0:7])
    12 
    28 
    43             return adapter_mapping[field](value)
    59             return adapter_mapping[field](value)
    44         else:
    60         else:
    45             return value
    61             return value
    46     return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])    
    62     return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])    
    47 
    63 
    48 def get_user(user_dict, session):
    64 
    49     
    65 
    50     logging.debug("Get user : " + repr(user_dict))
    66 class TwitterProcessorException(Exception):
    51     
    67     pass
    52     user_id = user_dict.get("id",None)    
    68 
    53     user_name = user_dict.get("screen_name", user_dict.get("name", None))
    69 class TwitterProcessor(object):
    54     
    70     
    55     if user_id is None and user_name is None:
    71     def __init__(self, json_dict, json_txt, session):
    56         return None
    72 
    57 
    73         if json_dict is None and json_txt is None:
    58     if user_id:
    74             raise TwitterProcessorException("No json")
    59         user = session.query(User).filter(User.id == user_id).first()
    75         
       
    76         if json_dict is None:
       
    77             self.json_dict = json.loads(json_txt)
       
    78         else:
       
    79             self.json_dict = json_dict
       
    80         
       
    81         if not json_txt:
       
    82             self.json_txt = json.dumps(json_dict)
       
    83         else:
       
    84             self.json_txt = json_txt
       
    85         
       
    86         if "id" not in self.json_dict:
       
    87             raise TwitterProcessorException("No id in json")
       
    88         
       
    89         self.session = session
       
    90 
       
    91     def __get_user(self, user_dict):
       
    92         logging.debug("Get user : " + repr(user_dict))
       
    93     
       
    94         user_id = user_dict.get("id",None)    
       
    95         user_name = user_dict.get("screen_name", user_dict.get("name", None))
       
    96         
       
    97         if user_id is None and user_name is None:
       
    98             return None
       
    99     
       
   100         if user_id:
       
   101             user = self.session.query(User).filter(User.id == user_id).first()
       
   102         else:
       
   103             user = self.session.query(User).filter(User.screen_name == user_name).first()
       
   104     
       
   105         if user is not None:
       
   106             return user
       
   107     
       
   108         user_created_at = user_dict.get("created_at", None)
       
   109         
       
   110         if user_created_at is None:
       
   111             acess_token_key, access_token_secret = get_oauth_token()
       
   112             t = twitter.Twitter(auth=twitter.OAuth(token_key, token_secret, CONSUMER_KEY, CONSUMER_SECRET))
       
   113             try:
       
   114                 if user_id:
       
   115                     user_dict = t.users.show(user_id=user_id)
       
   116                 else:
       
   117                     user_dict = t.users.show(screen_name=user_name)            
       
   118             except Exception as e:
       
   119                 logging.info("get_user : TWITTER ERROR : " + repr(e))
       
   120                 logging.info("get_user : TWITTER ERROR : " + str(e))
       
   121     
       
   122         user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
       
   123         if "id" not in user_dict:
       
   124             return None
       
   125         
       
   126         user = User(**user_dict)
       
   127         
       
   128         self.session.add(user)
       
   129         self.session.flush()
       
   130         
       
   131         return user 
       
   132 
       
   133     def __process_entity(self, ind, ind_type):
       
   134         logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type))
       
   135     
       
   136         entity_dict = {
       
   137            "indice_start": ind["indices"][0],
       
   138            "indice_end"  : ind["indices"][1],
       
   139            "tweet_id"    : self.tweet.id,
       
   140            "tweet"       : self.tweet
       
   141         }
       
   142     
       
   143         def process_hashtags():
       
   144             text = ind.get("text", ind.get("hashtag", None))
       
   145             if text is None:
       
   146                 return None 
       
   147             hashtag = self.session.query(Hashtag).filter(Hashtag.text == text).first()
       
   148             if not hashtag:
       
   149                 ind["text"] = text
       
   150                 hashtag = Hashtag(**ind)
       
   151                 self.session.add(hashtag)
       
   152                 self.session.flush()
       
   153             entity_dict['hashtag'] = hashtag
       
   154             entity_dict['hashtag_id'] = hashtag.id
       
   155             entity = EntityHashtag(**entity_dict)
       
   156             return entity
       
   157         
       
   158         def process_user_mentions():
       
   159             user_mention = self.__get_user(ind)
       
   160             if user_mention is None:
       
   161                 entity_dict['user'] = None
       
   162                 entity_dict['user_id'] = None
       
   163             else:
       
   164                 entity_dict['user'] = user_mention
       
   165                 entity_dict['user_id'] = user_mention.id
       
   166             entity = EntityUser(**entity_dict)
       
   167             return entity
       
   168         
       
   169         def process_urls():
       
   170             url = self.session.query(Url).filter(Url.url == ind["url"]).first()
       
   171             if url is None:
       
   172                 url = Url(**ind)
       
   173                 self.session.add(url)
       
   174                 self.session.flush()
       
   175             entity_dict['url'] = url
       
   176             entity_dict['url_id'] = url.id
       
   177             entity = EntityUrl(**entity_dict)
       
   178             return entity
       
   179         
       
   180         #{'': lambda }
       
   181         entity =  { 
       
   182             'hashtags': process_hashtags,
       
   183             'user_mentions' : process_user_mentions,
       
   184             'urls' : process_urls
       
   185             }[ind_type]()
       
   186             
       
   187         logging.debug("Process_entity entity_dict: " + repr(entity_dict))
       
   188         if entity:
       
   189             self.session.add(entity)
       
   190             self.session.flush()
       
   191 
       
   192 
       
   193     def __process_twitter_stream(self):
       
   194         
       
   195         tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
       
   196         if tweet_nb > 0:
       
   197             return
       
   198         
       
   199         ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
       
   200         
       
   201         # get or create user
       
   202         user = self.__get_user(self.json_dict["user"])
       
   203         if user is None:
       
   204             log.warning("USER not found " + repr(ts["user"]))
       
   205             ts_copy["user"] = None
       
   206             ts_copy["user_id"] = None
       
   207         else:
       
   208             ts_copy["user"] = user
       
   209             ts_copy["user_id"] = ts_copy["user"].id
       
   210         ts_copy["original_json"] = self.json_txt
       
   211         
       
   212         self.tweet = Tweet(**ts_copy)
       
   213         self.session.add(self.tweet)
       
   214         self.session.flush()
       
   215             
       
   216         # get entities
       
   217         for ind_type, entity_list in self.json_dict["entities"].items():
       
   218             for ind in entity_list:
       
   219                 self.__process_entity(ind, ind_type)
       
   220 
       
   221 
       
   222     def __process_twitter_rest(self):
       
   223         tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
       
   224         if tweet_nb > 0:
       
   225             return
       
   226             
       
   227         tweet_fields = {
       
   228             'created_at': self.json_dict["created_at"], 
       
   229             'favorited': False,
       
   230             'id': self.json_dict["id"],
       
   231             'id_str': self.json_dict["id_str"],
       
   232             #'in_reply_to_screen_name': ts["to_user"], 
       
   233             'in_reply_to_user_id': self.json_dict["to_user_id"],
       
   234             'in_reply_to_user_id_str': self.json_dict["to_user_id_str"],
       
   235             #'place': ts["place"],
       
   236             'source': self.json_dict["source"],
       
   237             'text': self.json_dict["text"],
       
   238             'truncated': False,
       
   239             'original_json' : self.json_txt,
       
   240         }
       
   241         
       
   242         #user
       
   243     
       
   244         user_fields = {
       
   245             'id' : self.json_dict['from_user_id'],
       
   246             'id_str' : self.json_dict['from_user_id_str'],
       
   247             'lang' : self.json_dict['iso_language_code'],
       
   248             'profile_image_url' : self.json_dict["profile_image_url"],
       
   249             'screen_name' : self.json_dict["from_user"],                   
       
   250         }
       
   251         
       
   252         user = self.__get_user(user_fields)
       
   253         if user is None:
       
   254             log.warning("USER not found " + repr(user_fields))
       
   255             tweet_fields["user"] = None
       
   256             tweet_fields["user_id"] = None
       
   257         else:
       
   258             tweet_fields["user"] = user
       
   259             tweet_fields["user_id"] = user.id
       
   260         
       
   261         tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
       
   262         self.tweet = Tweet(**tweet_fields)
       
   263         session.add(self.tweet)
       
   264         
       
   265         text = self.tweet.text
       
   266         
       
   267         extractor = twitter_text.Extractor(text)
       
   268         
       
   269         for ind in extractor.extract_hashtags_with_indices():
       
   270             self.__process_entity(ind, "hashtags")
       
   271             
       
   272         for ind in extractor.extract_mentioned_screen_names_with_indices():
       
   273             self.__process_entity(ind, "user_mentions")
       
   274         
       
   275         for ind in extractor.extract_urls_with_indices():
       
   276             self.__process_entity(ind, "urls")
       
   277         
       
   278         self.session.flush()
       
   279 
       
   280 
       
   281     def process(self):
       
   282         if "metadata" in self.json_dict:
       
   283             self.__process_twitter_rest()
       
   284         else:
       
   285             self.__process_twitter_stream()
       
   286         
       
   287 
       
   288 def set_logging(options):
       
   289     
       
   290     logging_config = {}
       
   291     
       
   292     if options.logfile == "stdout":
       
   293         logging_config["stream"] = sys.stdout
       
   294     elif options.logfile == "stderr":
       
   295         logging_config["stream"] = sys.stderr
    60     else:
   296     else:
    61         user = session.query(User).filter(User.screen_name == user_name).first()
   297         logging_config["filename"] = options.logfile
    62 
   298         
    63     if user is not None:
   299     logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet))
    64         return user
   300     logging.basicConfig(**logging_config)
    65 
   301     
    66     user_created_at = user_dict.get("created_at", None)
   302     options.debug = (options.verbose-options.quiet > 0)
    67     
   303 
    68     if user_created_at is None:
   304 def set_logging_options(parser):
    69         t = twitter.Twitter(auth=twitter.OAuth(ACCESS_TOKEN_KEY, ACCESS_TOKEN_SECRET, CONSUMER_KEY, CONSUMER_SECRET))
   305     parser.add_option("-l", "--log", dest="logfile",
    70         try:
   306                       help="log to file", metavar="LOG", default="stderr")
    71             if user_id:
   307     parser.add_option("-v", dest="verbose", action="count",
    72                 user_dict = t.users.show(user_id=user_id)
   308                       help="verbose", metavar="VERBOSE", default=0)
    73             else:
   309     parser.add_option("-q", dest="quiet", action="count",
    74                 user_dict = t.users.show(screen_name=user_name)            
   310                       help="quiet", metavar="QUIET", default=0)
    75         except Exception as e:
       
    76             logging.info("get_user : TWITTER ERROR : " + repr(e))
       
    77             logging.info("get_user : TWITTER ERROR : " + str(e))
       
    78 
       
    79     user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
       
    80     if "id" not in user_dict:
       
    81         return None
       
    82     
       
    83     user = User(**user_dict)
       
    84     
       
    85     session.add(user)
       
    86     session.flush()
       
    87     
       
    88     return user 
       
    89     # if not, if needed get info from twitter
       
    90     # create user
       
    91     # return it
       
    92 
       
    93 def process_entity(ind, ind_type, tweet, session):
       
    94 
       
    95     logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type))
       
    96 
       
    97     entity_dict = {
       
    98        "indice_start": ind["indices"][0],
       
    99        "indice_end"  : ind["indices"][1],
       
   100        "tweet_id"    : tweet.id,
       
   101        "tweet"       : tweet
       
   102     }
       
   103 
       
   104     def process_hashtags():
       
   105         text = ind.get("text", ind.get("hashtag", None))
       
   106         if text is None:
       
   107             return None 
       
   108         hashtag = session.query(Hashtag).filter(Hashtag.text == text).first()
       
   109         if not hashtag:
       
   110             ind["text"] = text
       
   111             hashtag = Hashtag(**ind)
       
   112             session.add(hashtag)
       
   113             session.flush()
       
   114         entity_dict['hashtag'] = hashtag
       
   115         entity_dict['hashtag_id'] = hashtag.id
       
   116         entity = EntityHashtag(**entity_dict)
       
   117         return entity
       
   118     
       
   119     def process_user_mentions():
       
   120         user_mention = get_user(ind, session)
       
   121         if user_mention is None:
       
   122             entity_dict['user'] = None
       
   123             entity_dict['user_id'] = None
       
   124         else:
       
   125             entity_dict['user'] = user_mention
       
   126             entity_dict['user_id'] = user_mention.id
       
   127         entity = EntityUser(**entity_dict)
       
   128         return entity
       
   129     
       
   130     def process_urls():
       
   131         url = session.query(Url).filter(Url.url == ind["url"]).first()
       
   132         if url is None:
       
   133             url = Url(**ind)
       
   134             session.add(url)
       
   135             session.flush()
       
   136         entity_dict['url'] = url
       
   137         entity_dict['url_id'] = url.id
       
   138         entity = EntityUrl(**entity_dict)
       
   139         return entity
       
   140     
       
   141     #{'': lambda }
       
   142     entity =  { 
       
   143         'hashtags': process_hashtags,
       
   144         'user_mentions' : process_user_mentions,
       
   145         'urls' : process_urls
       
   146         }[ind_type]()
       
   147         
       
   148     logging.debug("Process_entity entity_dict: " + repr(entity_dict))
       
   149     if entity:
       
   150         session.add(entity)
       
   151 
       
   152 
       
   153 
       
   154 def from_twitter_rest(ts, jsontxt, session):
       
   155     
       
   156     tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count()
       
   157     if tweet_nb > 0:
       
   158         return
       
   159         
       
   160     tweet_fields = {
       
   161         'created_at': ts["created_at"], 
       
   162         'favorited': False,
       
   163         'id': ts["id"],
       
   164         'id_str': ts["id_str"],
       
   165         #'in_reply_to_screen_name': ts["to_user"], 
       
   166         'in_reply_to_user_id': ts["to_user_id"],
       
   167         'in_reply_to_user_id_str': ts["to_user_id_str"],
       
   168         #'place': ts["place"],
       
   169         'source': ts["source"],
       
   170         'text': ts["text"],
       
   171         'truncated': False,
       
   172         'original_json' : jsontxt,
       
   173     }
       
   174     
       
   175     #user
       
   176 
       
   177     user_fields = {
       
   178         'id' : ts['from_user_id'],
       
   179         'id_str' : ts['from_user_id_str'],
       
   180         'lang' : ts['iso_language_code'],
       
   181         'profile_image_url' : ts["profile_image_url"],
       
   182         'screen_name' : ts["from_user"],                   
       
   183     }
       
   184     
       
   185     user = get_user(user_fields, session)
       
   186     if user is None:
       
   187         log.warning("USER not found " + repr(user_fields))
       
   188         tweet_fields["user"] = None
       
   189         tweet_fields["user_id"] = None
       
   190     else:
       
   191         tweet_fields["user"] = user
       
   192         tweet_fields["user_id"] = user.id
       
   193     
       
   194     tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
       
   195     tweet = Tweet(**tweet_fields)
       
   196     session.add(tweet)
       
   197     
       
   198     text = tweet.text
       
   199     
       
   200     extractor = twitter_text.Extractor(text)
       
   201     
       
   202     for ind in extractor.extract_hashtags_with_indices():
       
   203         process_entity(ind, "hashtags", tweet, session)
       
   204         
       
   205     for ind in extractor.extract_mentioned_screen_names_with_indices():
       
   206         process_entity(ind, "user_mentions", tweet, session)
       
   207     
       
   208     for ind in extractor.extract_urls_with_indices():
       
   209         process_entity(ind, "urls", tweet, session)
       
   210     
       
   211     
       
   212     
       
   213 
       
   214 def from_twitter_stream(ts, jsontxt, session):
       
   215     
       
   216     tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count()
       
   217     if tweet_nb > 0:
       
   218         return
       
   219     
       
   220     ts_copy = adapt_fields(ts, fields_adapter["stream"]["tweet"])
       
   221     
       
   222     # get or create user
       
   223     user = get_user(ts["user"], session)
       
   224     if user is None:
       
   225         log.warning("USER not found " + repr(ts["user"]))
       
   226         ts_copy["user"] = None
       
   227         ts_copy["user_id"] = None
       
   228     else:
       
   229         ts_copy["user"] = user
       
   230         ts_copy["user_id"] = ts_copy["user"].id
       
   231     ts_copy["original_json"] = jsontxt
       
   232     
       
   233     tweet = Tweet(**ts_copy)
       
   234     session.add(tweet)
       
   235     session.flush()
       
   236         
       
   237     # get entities
       
   238     for ind_type, entity_list in ts["entities"].items():
       
   239         for ind in entity_list:
       
   240             process_entity(ind, ind_type, tweet, session)