script/iri_tweet/utils.py
author Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Tue, 11 Jan 2011 11:17:17 +0100
changeset 9 bb44692e09ee
child 11 54d7f1486ac4
permissions -rw-r--r--
script apres traitement enmi

import email.utils
import logging
from models import *
import datetime
import twitter
import twitter_text


def parse_date(date_str):
    ts = email.utils.parsedate_tz(date_str)
    return datetime.datetime(*ts[0:7])


fields_adapter = {
    'stream': {
        "tweet": {
            "created_at"    : adapt_date,
            "coordinates"   : adapt_json,
            "place"         : adapt_json,
            "geo"           : adapt_json,
#            "original_json" : adapt_json,
        },
        "user": {
            "created_at"  : adapt_date,
        },
    },
    'rest': {
        "tweet" : {
            "place"         : adapt_json,
            "geo"           : adapt_json,
            "created_at"    : adapt_date,
#            "original_json" : adapt_json,
        }, 
    },
}

#
# adapt fields, return a copy of the field_dict with adapted fields
#
def adapt_fields(fields_dict, adapter_mapping):
    def adapt_one_field(field, value):
        if field in adapter_mapping and adapter_mapping[field] is not None:
            return adapter_mapping[field](value)
        else:
            return value
    return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])    

def get_user(user_dict, session):
    
    logging.debug("Get user : " + repr(user_dict))
    
    user_id = user_dict.get("id",None)    
    user_name = user_dict.get("screen_name", user_dict.get("name", None))
    
    if user_id is None and user_name is None:
        return None

    if user_id:
        user = session.query(User).filter(User.id == user_id).first()
    else:
        user = session.query(User).filter(User.screen_name == user_name).first()

    if user is not None:
        return user

    user_created_at = user_dict.get("created_at", None)
    
    if user_created_at is None:
        t = twitter.Twitter(auth=twitter.OAuth(ACCESS_TOKEN_KEY, ACCESS_TOKEN_SECRET, CONSUMER_KEY, CONSUMER_SECRET))
        try:
            if user_id:
                user_dict = t.users.show(user_id=user_id)
            else:
                user_dict = t.users.show(screen_name=user_name)            
        except Exception as e:
            logging.info("get_user : TWITTER ERROR : " + repr(e))
            logging.info("get_user : TWITTER ERROR : " + str(e))

    user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
    if "id" not in user_dict:
        return None
    
    user = User(**user_dict)
    
    session.add(user)
    session.flush()
    
    return user 
    # if not, if needed get info from twitter
    # create user
    # return it

def process_entity(ind, ind_type, tweet, session):

    logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type))

    entity_dict = {
       "indice_start": ind["indices"][0],
       "indice_end"  : ind["indices"][1],
       "tweet_id"    : tweet.id,
       "tweet"       : tweet
    }

    def process_hashtags():
        text = ind.get("text", ind.get("hashtag", None))
        if text is None:
            return None 
        hashtag = session.query(Hashtag).filter(Hashtag.text == text).first()
        if not hashtag:
            ind["text"] = text
            hashtag = Hashtag(**ind)
            session.add(hashtag)
            session.flush()
        entity_dict['hashtag'] = hashtag
        entity_dict['hashtag_id'] = hashtag.id
        entity = EntityHashtag(**entity_dict)
        return entity
    
    def process_user_mentions():
        user_mention = get_user(ind, session)
        if user_mention is None:
            entity_dict['user'] = None
            entity_dict['user_id'] = None
        else:
            entity_dict['user'] = user_mention
            entity_dict['user_id'] = user_mention.id
        entity = EntityUser(**entity_dict)
        return entity
    
    def process_urls():
        url = session.query(Url).filter(Url.url == ind["url"]).first()
        if url is None:
            url = Url(**ind)
            session.add(url)
            session.flush()
        entity_dict['url'] = url
        entity_dict['url_id'] = url.id
        entity = EntityUrl(**entity_dict)
        return entity
    
    #{'': lambda }
    entity =  { 
        'hashtags': process_hashtags,
        'user_mentions' : process_user_mentions,
        'urls' : process_urls
        }[ind_type]()
        
    logging.debug("Process_entity entity_dict: " + repr(entity_dict))
    if entity:
        session.add(entity)



def from_twitter_rest(ts, jsontxt, session):
    
    tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count()
    if tweet_nb > 0:
        return
        
    tweet_fields = {
        'created_at': ts["created_at"], 
        'favorited': False,
        'id': ts["id"],
        'id_str': ts["id_str"],
        #'in_reply_to_screen_name': ts["to_user"], 
        'in_reply_to_user_id': ts["to_user_id"],
        'in_reply_to_user_id_str': ts["to_user_id_str"],
        #'place': ts["place"],
        'source': ts["source"],
        'text': ts["text"],
        'truncated': False,
        'original_json' : jsontxt,
    }
    
    #user

    user_fields = {
        'id' : ts['from_user_id'],
        'id_str' : ts['from_user_id_str'],
        'lang' : ts['iso_language_code'],
        'profile_image_url' : ts["profile_image_url"],
        'screen_name' : ts["from_user"],                   
    }
    
    user = get_user(user_fields, session)
    if user is None:
        log.warning("USER not found " + repr(user_fields))
        tweet_fields["user"] = None
        tweet_fields["user_id"] = None
    else:
        tweet_fields["user"] = user
        tweet_fields["user_id"] = user.id
    
    tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
    tweet = Tweet(**tweet_fields)
    session.add(tweet)
    
    text = tweet.text
    
    extractor = twitter_text.Extractor(text)
    
    for ind in extractor.extract_hashtags_with_indices():
        process_entity(ind, "hashtags", tweet, session)
        
    for ind in extractor.extract_mentioned_screen_names_with_indices():
        process_entity(ind, "user_mentions", tweet, session)
    
    for ind in extractor.extract_urls_with_indices():
        process_entity(ind, "urls", tweet, session)
    
    
    

def from_twitter_stream(ts, jsontxt, session):
    
    tweet_nb = session.query(Tweet).filter(Tweet.id == ts["id"]).count()
    if tweet_nb > 0:
        return
    
    ts_copy = adapt_fields(ts, fields_adapter["stream"]["tweet"])
    
    # get or create user
    user = get_user(ts["user"], session)
    if user is None:
        log.warning("USER not found " + repr(ts["user"]))
        ts_copy["user"] = None
        ts_copy["user_id"] = None
    else:
        ts_copy["user"] = user
        ts_copy["user_id"] = ts_copy["user"].id
    ts_copy["original_json"] = jsontxt
    
    tweet = Tweet(**ts_copy)
    session.add(tweet)
    session.flush()
        
    # get entities
    for ind_type, entity_list in ts["entities"].items():
        for ind in entity_list:
            process_entity(ind, ind_type, tweet, session)