script/iri_tweet/utils.py
changeset 9 bb44692e09ee
child 11 54d7f1486ac4
equal deleted inserted replaced
8:b7f4b0554ef8 9:bb44692e09ee
       
     1 import email.utils
       
     2 import logging
       
     3 from models import *
       
     4 import datetime
       
     5 import twitter
       
     6 import twitter_text
       
     7 
       
     8 
       
     9 def parse_date(date_str):
       
    10     ts = email.utils.parsedate_tz(date_str)
       
    11     return datetime.datetime(*ts[0:7])
       
    12 
       
    13 
       
    14 fields_adapter = {
       
    15     'stream': {
       
    16         "tweet": {
       
    17             "created_at"    : adapt_date,
       
    18             "coordinates"   : adapt_json,
       
    19             "place"         : adapt_json,
       
    20             "geo"           : adapt_json,
       
    21 #            "original_json" : adapt_json,
       
    22         },
       
    23         "user": {
       
    24             "created_at"  : adapt_date,
       
    25         },
       
    26     },
       
    27     'rest': {
       
    28         "tweet" : {
       
    29             "place"         : adapt_json,
       
    30             "geo"           : adapt_json,
       
    31             "created_at"    : adapt_date,
       
    32 #            "original_json" : adapt_json,
       
    33         }, 
       
    34     },
       
    35 }
       
    36 
       
    37 #
       
    38 # adapt fields, return a copy of the field_dict with adapted fields
       
    39 #
       
    40 def adapt_fields(fields_dict, adapter_mapping):
       
    41     def adapt_one_field(field, value):
       
    42         if field in adapter_mapping and adapter_mapping[field] is not None:
       
    43             return adapter_mapping[field](value)
       
    44         else:
       
    45             return value
       
    46     return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])    
       
    47 
       
    48 def get_user(user_dict, session):
       
    49     
       
    50     logging.debug("Get user : " + repr(user_dict))
       
    51     
       
    52     user_id = user_dict.get("id",None)    
       
    53     user_name = user_dict.get("screen_name", user_dict.get("name", None))
       
    54     
       
    55     if user_id is None and user_name is None:
       
    56         return None
       
    57 
       
    58     if user_id:
       
    59         user = session.query(User).filter(User.id == user_id).first()
       
    60     else:
       
    61         user = session.query(User).filter(User.screen_name == user_name).first()
       
    62 
       
    63     if user is not None:
       
    64         return user
       
    65 
       
    66     user_created_at = user_dict.get("created_at", None)
       
    67     
       
    68     if user_created_at is None:
       
    69         t = twitter.Twitter(auth=twitter.OAuth(ACCESS_TOKEN_KEY, ACCESS_TOKEN_SECRET, CONSUMER_KEY, CONSUMER_SECRET))
       
    70         try:
       
    71             if user_id:
       
    72                 user_dict = t.users.show(user_id=user_id)
       
    73             else:
       
    74                 user_dict = t.users.show(screen_name=user_name)            
       
    75         except Exception as e:
       
    76             logging.info("get_user : TWITTER ERROR : " + repr(e))
       
    77             logging.info("get_user : TWITTER ERROR : " + str(e))
       
    78 
       
    79     user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
       
    80     if "id" not in user_dict:
       
    81         return None
       
    82     
       
    83     user = User(**user_dict)
       
    84     
       
    85     session.add(user)
       
    86     session.flush()
       
    87     
       
    88     return user 
       
    89     # if not, if needed get info from twitter
       
    90     # create user
       
    91     # return it
       
    92 
       
    93 def process_entity(ind, ind_type, tweet, session):
       
    94 
       
    95     logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type))
       
    96 
       
    97     entity_dict = {
       
    98        "indice_start": ind["indices"][0],
       
    99        "indice_end"  : ind["indices"][1],
       
   100        "tweet_id"    : tweet.id,
       
   101        "tweet"       : tweet
       
   102     }
       
   103 
       
   104     def process_hashtags():
       
   105         text = ind.get("text", ind.get("hashtag", None))
       
   106         if text is None:
       
   107             return None 
       
   108         hashtag = session.query(Hashtag).filter(Hashtag.text == text).first()
       
   109         if not hashtag:
       
   110             ind["text"] = text
       
   111             hashtag = Hashtag(**ind)
       
   112             session.add(hashtag)
       
   113             session.flush()
       
   114         entity_dict['hashtag'] = hashtag
       
   115         entity_dict['hashtag_id'] = hashtag.id
       
   116         entity = EntityHashtag(**entity_dict)
       
   117         return entity
       
   118     
       
   119     def process_user_mentions():
       
   120         user_mention = get_user(ind, session)
       
   121         if user_mention is None:
       
   122             entity_dict['user'] = None
       
   123             entity_dict['user_id'] = None
       
   124         else:
       
   125             entity_dict['user'] = user_mention
       
   126             entity_dict['user_id'] = user_mention.id
       
   127         entity = EntityUser(**entity_dict)
       
   128         return entity
       
   129     
       
   130     def process_urls():
       
   131         url = session.query(Url).filter(Url.url == ind["url"]).first()
       
   132         if url is None:
       
   133             url = Url(**ind)
       
   134             session.add(url)
       
   135             session.flush()
       
   136         entity_dict['url'] = url
       
   137         entity_dict['url_id'] = url.id
       
   138         entity = EntityUrl(**entity_dict)
       
   139         return entity
       
   140     
       
   141     #{'': lambda }
       
   142     entity =  { 
       
   143         'hashtags': process_hashtags,
       
   144         'user_mentions' : process_user_mentions,
       
   145         'urls' : process_urls
       
   146         }[ind_type]()
       
   147         
       
   148     logging.debug("Process_entity entity_dict: " + repr(entity_dict))
       
   149     if entity:
       
   150         session.add(entity)
       
   151 
       
   152 
       
   153 
       
   154 def from_twitter_rest(ts, jsontxt, session):
       
   155     
       
   156     tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count()
       
   157     if tweet_nb > 0:
       
   158         return
       
   159         
       
   160     tweet_fields = {
       
   161         'created_at': ts["created_at"], 
       
   162         'favorited': False,
       
   163         'id': ts["id"],
       
   164         'id_str': ts["id_str"],
       
   165         #'in_reply_to_screen_name': ts["to_user"], 
       
   166         'in_reply_to_user_id': ts["to_user_id"],
       
   167         'in_reply_to_user_id_str': ts["to_user_id_str"],
       
   168         #'place': ts["place"],
       
   169         'source': ts["source"],
       
   170         'text': ts["text"],
       
   171         'truncated': False,
       
   172         'original_json' : jsontxt,
       
   173     }
       
   174     
       
   175     #user
       
   176 
       
   177     user_fields = {
       
   178         'id' : ts['from_user_id'],
       
   179         'id_str' : ts['from_user_id_str'],
       
   180         'lang' : ts['iso_language_code'],
       
   181         'profile_image_url' : ts["profile_image_url"],
       
   182         'screen_name' : ts["from_user"],                   
       
   183     }
       
   184     
       
   185     user = get_user(user_fields, session)
       
   186     if user is None:
       
   187         log.warning("USER not found " + repr(user_fields))
       
   188         tweet_fields["user"] = None
       
   189         tweet_fields["user_id"] = None
       
   190     else:
       
   191         tweet_fields["user"] = user
       
   192         tweet_fields["user_id"] = user.id
       
   193     
       
   194     tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
       
   195     tweet = Tweet(**tweet_fields)
       
   196     session.add(tweet)
       
   197     
       
   198     text = tweet.text
       
   199     
       
   200     extractor = twitter_text.Extractor(text)
       
   201     
       
   202     for ind in extractor.extract_hashtags_with_indices():
       
   203         process_entity(ind, "hashtags", tweet, session)
       
   204         
       
   205     for ind in extractor.extract_mentioned_screen_names_with_indices():
       
   206         process_entity(ind, "user_mentions", tweet, session)
       
   207     
       
   208     for ind in extractor.extract_urls_with_indices():
       
   209         process_entity(ind, "urls", tweet, session)
       
   210     
       
   211     
       
   212     
       
   213 
       
   214 def from_twitter_stream(ts, jsontxt, session):
       
   215     
       
   216     tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count()
       
   217     if tweet_nb > 0:
       
   218         return
       
   219     
       
   220     ts_copy = adapt_fields(ts, fields_adapter["stream"]["tweet"])
       
   221     
       
   222     # get or create user
       
   223     user = get_user(ts["user"], session)
       
   224     if user is None:
       
   225         log.warning("USER not found " + repr(ts["user"]))
       
   226         ts_copy["user"] = None
       
   227         ts_copy["user_id"] = None
       
   228     else:
       
   229         ts_copy["user"] = user
       
   230         ts_copy["user_id"] = ts_copy["user"].id
       
   231     ts_copy["original_json"] = jsontxt
       
   232     
       
   233     tweet = Tweet(**ts_copy)
       
   234     session.add(tweet)
       
   235     session.flush()
       
   236         
       
   237     # get entities
       
   238     for ind_type, entity_list in ts["entities"].items():
       
   239         for ind in entity_list:
       
   240             process_entity(ind, ind_type, tweet, session)