script/lib/iri_tweet/utils.py
changeset 12 4daf47fcf792
parent 11 54d7f1486ac4
child 15 5d552b6a0e55
equal deleted inserted replaced
11:54d7f1486ac4 12:4daf47fcf792
       
     1 from models import *
       
     2 import datetime
       
     3 import email.utils
       
     4 import json
       
     5 import logging
       
     6 import sys
       
     7 import twitter
       
     8 import twitter_text
       
     9 import os.path
       
    10 import twitter.oauth
       
    11 
       
    12 
       
    13 def get_oauth_token(token_file_path=None):
       
    14     
       
    15     if token_file_path and os.path.file_exists(token_file_path):
       
    16         logging.debug("reading token from file %s" % token_file_path)
       
    17         return twitter.oauth.read_token_file(token_file_path)
       
    18         #read access token info from path
       
    19     
       
    20     if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
       
    21         return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
       
    22     
       
    23     return twitter.oauth_dance.oauth_dance(APPLICATION_NAME, CONSUMER_KEY, CONSUMER_SECRET, token_filename)
       
    24 
       
    25 def parse_date(date_str):
       
    26     ts = email.utils.parsedate_tz(date_str)
       
    27     return datetime.datetime(*ts[0:7])
       
    28 
       
    29 
       
    30 fields_adapter = {
       
    31     'stream': {
       
    32         "tweet": {
       
    33             "created_at"    : adapt_date,
       
    34             "coordinates"   : adapt_json,
       
    35             "place"         : adapt_json,
       
    36             "geo"           : adapt_json,
       
    37 #            "original_json" : adapt_json,
       
    38         },
       
    39         "user": {
       
    40             "created_at"  : adapt_date,
       
    41         },
       
    42     },
       
    43     'rest': {
       
    44         "tweet" : {
       
    45             "place"         : adapt_json,
       
    46             "geo"           : adapt_json,
       
    47             "created_at"    : adapt_date,
       
    48 #            "original_json" : adapt_json,
       
    49         }, 
       
    50     },
       
    51 }
       
    52 
       
    53 #
       
    54 # adapt fields, return a copy of the field_dict with adapted fields
       
    55 #
       
    56 def adapt_fields(fields_dict, adapter_mapping):
       
    57     def adapt_one_field(field, value):
       
    58         if field in adapter_mapping and adapter_mapping[field] is not None:
       
    59             return adapter_mapping[field](value)
       
    60         else:
       
    61             return value
       
    62     return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])    
       
    63 
       
    64 
       
    65 
       
    66 class TwitterProcessorException(Exception):
       
    67     pass
       
    68 
       
    69 class TwitterProcessor(object):
       
    70     
       
    71     def __init__(self, json_dict, json_txt, session):
       
    72 
       
    73         if json_dict is None and json_txt is None:
       
    74             raise TwitterProcessorException("No json")
       
    75         
       
    76         if json_dict is None:
       
    77             self.json_dict = json.loads(json_txt)
       
    78         else:
       
    79             self.json_dict = json_dict
       
    80         
       
    81         if not json_txt:
       
    82             self.json_txt = json.dumps(json_dict)
       
    83         else:
       
    84             self.json_txt = json_txt
       
    85         
       
    86         if "id" not in self.json_dict:
       
    87             raise TwitterProcessorException("No id in json")
       
    88         
       
    89         self.session = session
       
    90 
       
    91     def __get_user(self, user_dict):
       
    92         logging.debug("Get user : " + repr(user_dict))
       
    93     
       
    94         user_id = user_dict.get("id",None)    
       
    95         user_name = user_dict.get("screen_name", user_dict.get("name", None))
       
    96         
       
    97         if user_id is None and user_name is None:
       
    98             return None
       
    99     
       
   100         if user_id:
       
   101             user = self.session.query(User).filter(User.id == user_id).first()
       
   102         else:
       
   103             user = self.session.query(User).filter(User.screen_name == user_name).first()
       
   104     
       
   105         if user is not None:
       
   106             return user
       
   107     
       
   108         user_created_at = user_dict.get("created_at", None)
       
   109         
       
   110         if user_created_at is None:
       
   111             acess_token_key, access_token_secret = get_oauth_token()
       
   112             t = twitter.Twitter(auth=twitter.OAuth(token_key, token_secret, CONSUMER_KEY, CONSUMER_SECRET))
       
   113             try:
       
   114                 if user_id:
       
   115                     user_dict = t.users.show(user_id=user_id)
       
   116                 else:
       
   117                     user_dict = t.users.show(screen_name=user_name)            
       
   118             except Exception as e:
       
   119                 logging.info("get_user : TWITTER ERROR : " + repr(e))
       
   120                 logging.info("get_user : TWITTER ERROR : " + str(e))
       
   121     
       
   122         user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
       
   123         if "id" not in user_dict:
       
   124             return None
       
   125         
       
   126         user = User(**user_dict)
       
   127         
       
   128         self.session.add(user)
       
   129         self.session.flush()
       
   130         
       
   131         return user 
       
   132 
       
   133     def __process_entity(self, ind, ind_type):
       
   134         logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type))
       
   135     
       
   136         entity_dict = {
       
   137            "indice_start": ind["indices"][0],
       
   138            "indice_end"  : ind["indices"][1],
       
   139            "tweet_id"    : self.tweet.id,
       
   140            "tweet"       : self.tweet
       
   141         }
       
   142     
       
   143         def process_hashtags():
       
   144             text = ind.get("text", ind.get("hashtag", None))
       
   145             if text is None:
       
   146                 return None 
       
   147             hashtag = self.session.query(Hashtag).filter(Hashtag.text == text).first()
       
   148             if not hashtag:
       
   149                 ind["text"] = text
       
   150                 hashtag = Hashtag(**ind)
       
   151                 self.session.add(hashtag)
       
   152                 self.session.flush()
       
   153             entity_dict['hashtag'] = hashtag
       
   154             entity_dict['hashtag_id'] = hashtag.id
       
   155             entity = EntityHashtag(**entity_dict)
       
   156             return entity
       
   157         
       
   158         def process_user_mentions():
       
   159             user_mention = self.__get_user(ind)
       
   160             if user_mention is None:
       
   161                 entity_dict['user'] = None
       
   162                 entity_dict['user_id'] = None
       
   163             else:
       
   164                 entity_dict['user'] = user_mention
       
   165                 entity_dict['user_id'] = user_mention.id
       
   166             entity = EntityUser(**entity_dict)
       
   167             return entity
       
   168         
       
   169         def process_urls():
       
   170             url = self.session.query(Url).filter(Url.url == ind["url"]).first()
       
   171             if url is None:
       
   172                 url = Url(**ind)
       
   173                 self.session.add(url)
       
   174                 self.session.flush()
       
   175             entity_dict['url'] = url
       
   176             entity_dict['url_id'] = url.id
       
   177             entity = EntityUrl(**entity_dict)
       
   178             return entity
       
   179         
       
   180         #{'': lambda }
       
   181         entity =  { 
       
   182             'hashtags': process_hashtags,
       
   183             'user_mentions' : process_user_mentions,
       
   184             'urls' : process_urls
       
   185             }[ind_type]()
       
   186             
       
   187         logging.debug("Process_entity entity_dict: " + repr(entity_dict))
       
   188         if entity:
       
   189             self.session.add(entity)
       
   190             self.session.flush()
       
   191 
       
   192 
       
   193     def __process_twitter_stream(self):
       
   194         
       
   195         tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
       
   196         if tweet_nb > 0:
       
   197             return
       
   198         
       
   199         ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
       
   200         
       
   201         # get or create user
       
   202         user = self.__get_user(self.json_dict["user"])
       
   203         if user is None:
       
   204             log.warning("USER not found " + repr(ts["user"]))
       
   205             ts_copy["user"] = None
       
   206             ts_copy["user_id"] = None
       
   207         else:
       
   208             ts_copy["user"] = user
       
   209             ts_copy["user_id"] = ts_copy["user"].id
       
   210         ts_copy["original_json"] = self.json_txt
       
   211         
       
   212         self.tweet = Tweet(**ts_copy)
       
   213         self.session.add(self.tweet)
       
   214         self.session.flush()
       
   215             
       
   216         # get entities
       
   217         for ind_type, entity_list in self.json_dict["entities"].items():
       
   218             for ind in entity_list:
       
   219                 self.__process_entity(ind, ind_type)
       
   220 
       
   221 
       
   222     def __process_twitter_rest(self):
       
   223         tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
       
   224         if tweet_nb > 0:
       
   225             return
       
   226             
       
   227         tweet_fields = {
       
   228             'created_at': self.json_dict["created_at"], 
       
   229             'favorited': False,
       
   230             'id': self.json_dict["id"],
       
   231             'id_str': self.json_dict["id_str"],
       
   232             #'in_reply_to_screen_name': ts["to_user"], 
       
   233             'in_reply_to_user_id': self.json_dict["to_user_id"],
       
   234             'in_reply_to_user_id_str': self.json_dict["to_user_id_str"],
       
   235             #'place': ts["place"],
       
   236             'source': self.json_dict["source"],
       
   237             'text': self.json_dict["text"],
       
   238             'truncated': False,
       
   239             'original_json' : self.json_txt,
       
   240         }
       
   241         
       
   242         #user
       
   243     
       
   244         user_fields = {
       
   245             'id' : self.json_dict['from_user_id'],
       
   246             'id_str' : self.json_dict['from_user_id_str'],
       
   247             'lang' : self.json_dict['iso_language_code'],
       
   248             'profile_image_url' : self.json_dict["profile_image_url"],
       
   249             'screen_name' : self.json_dict["from_user"],                   
       
   250         }
       
   251         
       
   252         user = self.__get_user(user_fields)
       
   253         if user is None:
       
   254             log.warning("USER not found " + repr(user_fields))
       
   255             tweet_fields["user"] = None
       
   256             tweet_fields["user_id"] = None
       
   257         else:
       
   258             tweet_fields["user"] = user
       
   259             tweet_fields["user_id"] = user.id
       
   260         
       
   261         tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
       
   262         self.tweet = Tweet(**tweet_fields)
       
   263         session.add(self.tweet)
       
   264         
       
   265         text = self.tweet.text
       
   266         
       
   267         extractor = twitter_text.Extractor(text)
       
   268         
       
   269         for ind in extractor.extract_hashtags_with_indices():
       
   270             self.__process_entity(ind, "hashtags")
       
   271             
       
   272         for ind in extractor.extract_mentioned_screen_names_with_indices():
       
   273             self.__process_entity(ind, "user_mentions")
       
   274         
       
   275         for ind in extractor.extract_urls_with_indices():
       
   276             self.__process_entity(ind, "urls")
       
   277         
       
   278         self.session.flush()
       
   279 
       
   280 
       
   281     def process(self):
       
   282         if "metadata" in self.json_dict:
       
   283             self.__process_twitter_rest()
       
   284         else:
       
   285             self.__process_twitter_stream()
       
   286         
       
   287 
       
   288 def set_logging(options):
       
   289     
       
   290     logging_config = {}
       
   291     
       
   292     if options.logfile == "stdout":
       
   293         logging_config["stream"] = sys.stdout
       
   294     elif options.logfile == "stderr":
       
   295         logging_config["stream"] = sys.stderr
       
   296     else:
       
   297         logging_config["filename"] = options.logfile
       
   298         
       
   299     logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet))
       
   300     logging.basicConfig(**logging_config)
       
   301     
       
   302     options.debug = (options.verbose-options.quiet > 0)
       
   303 
       
   304 def set_logging_options(parser):
       
   305     parser.add_option("-l", "--log", dest="logfile",
       
   306                       help="log to file", metavar="LOG", default="stderr")
       
   307     parser.add_option("-v", dest="verbose", action="count",
       
   308                       help="verbose", metavar="VERBOSE", default=0)
       
   309     parser.add_option("-q", dest="quiet", action="count",
       
   310                       help="quiet", metavar="QUIET", default=0)