script/iri_tweet/utils.py
changeset 9 bb44692e09ee
child 11 54d7f1486ac4
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/script/iri_tweet/utils.py	Tue Jan 11 11:17:17 2011 +0100
@@ -0,0 +1,240 @@
+import email.utils
+import logging
+from models import *
+import datetime
+import twitter
+import twitter_text
+
+
+def parse_date(date_str):
+    ts = email.utils.parsedate_tz(date_str)
+    return datetime.datetime(*ts[0:7])
+
+
+fields_adapter = {
+    'stream': {
+        "tweet": {
+            "created_at"    : adapt_date,
+            "coordinates"   : adapt_json,
+            "place"         : adapt_json,
+            "geo"           : adapt_json,
+#            "original_json" : adapt_json,
+        },
+        "user": {
+            "created_at"  : adapt_date,
+        },
+    },
+    'rest': {
+        "tweet" : {
+            "place"         : adapt_json,
+            "geo"           : adapt_json,
+            "created_at"    : adapt_date,
+#            "original_json" : adapt_json,
+        }, 
+    },
+}
+
+#
+# adapt fields, return a copy of the field_dict with adapted fields
+#
+def adapt_fields(fields_dict, adapter_mapping):
+    def adapt_one_field(field, value):
+        if field in adapter_mapping and adapter_mapping[field] is not None:
+            return adapter_mapping[field](value)
+        else:
+            return value
+    return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])    
+
+def get_user(user_dict, session):
+    
+    logging.debug("Get user : " + repr(user_dict))
+    
+    user_id = user_dict.get("id",None)    
+    user_name = user_dict.get("screen_name", user_dict.get("name", None))
+    
+    if user_id is None and user_name is None:
+        return None
+
+    if user_id:
+        user = session.query(User).filter(User.id == user_id).first()
+    else:
+        user = session.query(User).filter(User.screen_name == user_name).first()
+
+    if user is not None:
+        return user
+
+    user_created_at = user_dict.get("created_at", None)
+    
+    if user_created_at is None:
+        t = twitter.Twitter(auth=twitter.OAuth(ACCESS_TOKEN_KEY, ACCESS_TOKEN_SECRET, CONSUMER_KEY, CONSUMER_SECRET))
+        try:
+            if user_id:
+                user_dict = t.users.show(user_id=user_id)
+            else:
+                user_dict = t.users.show(screen_name=user_name)            
+        except Exception as e:
+            logging.info("get_user : TWITTER ERROR : " + repr(e))
+            logging.info("get_user : TWITTER ERROR : " + str(e))
+
+    user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
+    if "id" not in user_dict:
+        return None
+    
+    user = User(**user_dict)
+    
+    session.add(user)
+    session.flush()
+    
+    return user 
+    # if not, if needed get info from twitter
+    # create user
+    # return it
+
+def process_entity(ind, ind_type, tweet, session):
+
+    logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type))
+
+    entity_dict = {
+       "indice_start": ind["indices"][0],
+       "indice_end"  : ind["indices"][1],
+       "tweet_id"    : tweet.id,
+       "tweet"       : tweet
+    }
+
+    def process_hashtags():
+        text = ind.get("text", ind.get("hashtag", None))
+        if text is None:
+            return None 
+        hashtag = session.query(Hashtag).filter(Hashtag.text == text).first()
+        if not hashtag:
+            ind["text"] = text
+            hashtag = Hashtag(**ind)
+            session.add(hashtag)
+            session.flush()
+        entity_dict['hashtag'] = hashtag
+        entity_dict['hashtag_id'] = hashtag.id
+        entity = EntityHashtag(**entity_dict)
+        return entity
+    
+    def process_user_mentions():
+        user_mention = get_user(ind, session)
+        if user_mention is None:
+            entity_dict['user'] = None
+            entity_dict['user_id'] = None
+        else:
+            entity_dict['user'] = user_mention
+            entity_dict['user_id'] = user_mention.id
+        entity = EntityUser(**entity_dict)
+        return entity
+    
+    def process_urls():
+        url = session.query(Url).filter(Url.url == ind["url"]).first()
+        if url is None:
+            url = Url(**ind)
+            session.add(url)
+            session.flush()
+        entity_dict['url'] = url
+        entity_dict['url_id'] = url.id
+        entity = EntityUrl(**entity_dict)
+        return entity
+    
+    #{'': lambda }
+    entity =  { 
+        'hashtags': process_hashtags,
+        'user_mentions' : process_user_mentions,
+        'urls' : process_urls
+        }[ind_type]()
+        
+    logging.debug("Process_entity entity_dict: " + repr(entity_dict))
+    if entity:
+        session.add(entity)
+
+
+
+def from_twitter_rest(ts, jsontxt, session):
+    
+    tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count()
+    if tweet_nb > 0:
+        return
+        
+    tweet_fields = {
+        'created_at': ts["created_at"], 
+        'favorited': False,
+        'id': ts["id"],
+        'id_str': ts["id_str"],
+        #'in_reply_to_screen_name': ts["to_user"], 
+        'in_reply_to_user_id': ts["to_user_id"],
+        'in_reply_to_user_id_str': ts["to_user_id_str"],
+        #'place': ts["place"],
+        'source': ts["source"],
+        'text': ts["text"],
+        'truncated': False,
+        'original_json' : jsontxt,
+    }
+    
+    #user
+
+    user_fields = {
+        'id' : ts['from_user_id'],
+        'id_str' : ts['from_user_id_str'],
+        'lang' : ts['iso_language_code'],
+        'profile_image_url' : ts["profile_image_url"],
+        'screen_name' : ts["from_user"],                   
+    }
+    
+    user = get_user(user_fields, session)
+    if user is None:
+        log.warning("USER not found " + repr(user_fields))
+        tweet_fields["user"] = None
+        tweet_fields["user_id"] = None
+    else:
+        tweet_fields["user"] = user
+        tweet_fields["user_id"] = user.id
+    
+    tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
+    tweet = Tweet(**tweet_fields)
+    session.add(tweet)
+    
+    text = tweet.text
+    
+    extractor = twitter_text.Extractor(text)
+    
+    for ind in extractor.extract_hashtags_with_indices():
+        process_entity(ind, "hashtags", tweet, session)
+        
+    for ind in extractor.extract_mentioned_screen_names_with_indices():
+        process_entity(ind, "user_mentions", tweet, session)
+    
+    for ind in extractor.extract_urls_with_indices():
+        process_entity(ind, "urls", tweet, session)
+    
+    
+    
+
+def from_twitter_stream(ts, jsontxt, session):
+    
+    tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count()
+    if tweet_nb > 0:
+        return
+    
+    ts_copy = adapt_fields(ts, fields_adapter["stream"]["tweet"])
+    
+    # get or create user
+    user = get_user(ts["user"], session)
+    if user is None:
+        log.warning("USER not found " + repr(ts["user"]))
+        ts_copy["user"] = None
+        ts_copy["user_id"] = None
+    else:
+        ts_copy["user"] = user
+        ts_copy["user_id"] = ts_copy["user"].id
+    ts_copy["original_json"] = jsontxt
+    
+    tweet = Tweet(**ts_copy)
+    session.add(tweet)
+    session.flush()
+        
+    # get entities
+    for ind_type, entity_list in ts["entities"].items():
+        for ind in entity_list:
+            process_entity(ind, ind_type, tweet, session)