script/lib/iri_tweet/utils.py
author Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Wed, 27 Jul 2011 00:04:55 +0200
changeset 242 cdd7d3c0549c
parent 203 8124cde38141
child 243 9213a63fa34a
permissions -rw-r--r--
Starting 'parallel_twitter' branch

from models import (Tweet, User, Hashtag, EntityHashtag, EntityUser, Url, 
    EntityUrl, CONSUMER_KEY, CONSUMER_SECRET, APPLICATION_NAME, ACCESS_TOKEN_KEY, 
    ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog)
from sqlalchemy.sql import select, or_ #@UnresolvedImport
import anyjson #@UnresolvedImport
import datetime
import email.utils
import logging #@UnresolvedImport
import os.path
import sys
import twitter.oauth #@UnresolvedImport
import twitter.oauth_dance #@UnresolvedImport
import twitter_text #@UnresolvedImport



CACHE_ACCESS_TOKEN = {}

def get_oauth_token(token_file_path=None, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET):
    
    global CACHE_ACCESS_TOKEN
    
    if CACHE_ACCESS_TOKEN is not None and application_name in CACHE_ACCESS_TOKEN:
        return CACHE_ACCESS_TOKEN[application_name]
    
    if token_file_path and os.path.exists(token_file_path):
        logging.debug("reading token from file %s" % token_file_path) #@UndefinedVariable
        CACHE_ACCESS_TOKEN[application_name] = twitter.oauth.read_token_file(token_file_path)
        return CACHE_ACCESS_TOKEN[application_name]
        #read access token info from path
    
    if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
        return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
    
    CACHE_ACCESS_TOKEN[application_name] = twitter.oauth_dance.oauth_dance(application_name, consumer_key, consumer_secret, token_file_path)
    return CACHE_ACCESS_TOKEN[application_name]

def parse_date(date_str):
    ts = email.utils.parsedate_tz(date_str) #@UndefinedVariable
    return datetime.datetime(*ts[0:7])

def clean_keys(dict_val):
    return dict([(str(key),value) for key,value in dict_val.items()])

fields_adapter = {
    'stream': {
        "tweet": {
            "created_at"    : adapt_date,
            "coordinates"   : adapt_json,
            "place"         : adapt_json,
            "geo"           : adapt_json,
#            "original_json" : adapt_json,
        },
        "user": {
            "created_at"  : adapt_date,
        },
    },
    'rest': {
        "tweet" : {
            "place"         : adapt_json,
            "geo"           : adapt_json,
            "created_at"    : adapt_date,
#            "original_json" : adapt_json,
        }, 
    },
}

#
# adapt fields, return a copy of the field_dict with adapted fields
#
def adapt_fields(fields_dict, adapter_mapping):
    def adapt_one_field(field, value):
        if field in adapter_mapping and adapter_mapping[field] is not None:
            return adapter_mapping[field](value)
        else:
            return value
    return dict([(str(k),adapt_one_field(k,v)) for k,v in fields_dict.items()])    



class TwitterProcessorException(Exception):
    pass

class TwitterProcessor(object):
    
    def __init__(self, json_dict, json_txt, source_id, session, token_filename=None):

        if json_dict is None and json_txt is None:
            raise TwitterProcessorException("No json")
        
        if json_dict is None:
            self.json_dict = anyjson.deserialize(json_txt)
        else:
            self.json_dict = json_dict
        
        if not json_txt:
            self.json_txt = anyjson.serialize(json_dict)
        else:
            self.json_txt = json_txt
        
        if "id" not in self.json_dict:
            raise TwitterProcessorException("No id in json")
        
        self.source_id = source_id
        self.session = session
        self.token_filename = token_filename

    def __get_user(self, user_dict):
        logging.debug("Get user : " + repr(user_dict)) #@UndefinedVariable
    
        user_id = user_dict.get("id",None)    
        user_name = user_dict.get("screen_name", user_dict.get("name", None))
        
        if user_id is None and user_name is None:
            return None
    
        if user_id:
            user = self.session.query(User).filter(User.id == user_id).first()
        else:
            user = self.session.query(User).filter(User.screen_name == user_name).first()
    
        if user is not None:
            return user
    
        user_created_at = user_dict.get("created_at", None)
        
        if user_created_at is None:
            acess_token_key, access_token_secret = get_oauth_token(self.token_filename)
            t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET))
            try:
                if user_id:
                    user_dict = t.users.show(user_id=user_id)
                else:
                    user_dict = t.users.show(screen_name=user_name)            
            except Exception as e:
                logging.info("get_user : TWITTER ERROR : " + repr(e)) #@UndefinedVariable
                logging.info("get_user : TWITTER ERROR : " + str(e)) #@UndefinedVariable
    
        user_dict = adapt_fields(user_dict, fields_adapter["stream"]["user"])
        if "id" not in user_dict:
            return None
        
        user = User(**user_dict)
        
        self.session.add(user)
        self.session.flush()
        
        return user 

    def __process_entity(self, ind, ind_type):
        logging.debug("Process_entity : " + repr(ind) + " : " + repr(ind_type)) #@UndefinedVariable
        
        ind = clean_keys(ind)
        
        entity_dict = {
           "indice_start": ind["indices"][0],
           "indice_end"  : ind["indices"][1],
           "tweet_id"    : self.tweet.id,
           "tweet"       : self.tweet
        }
    
        def process_hashtags():
            text = ind.get("text", ind.get("hashtag", None))
            if text is None:
                return None 
            hashtag = self.session.query(Hashtag).filter(Hashtag.text == text).first()
            if not hashtag:
                ind["text"] = text
                hashtag = Hashtag(**ind)
                self.session.add(hashtag)
                self.session.flush()
            entity_dict['hashtag'] = hashtag
            entity_dict['hashtag_id'] = hashtag.id
            entity = EntityHashtag(**entity_dict)
            return entity
        
        def process_user_mentions():
            user_mention = self.__get_user(ind)
            if user_mention is None:
                entity_dict['user'] = None
                entity_dict['user_id'] = None
            else:
                entity_dict['user'] = user_mention
                entity_dict['user_id'] = user_mention.id
            entity = EntityUser(**entity_dict)
            return entity
        
        def process_urls():
            url = self.session.query(Url).filter(Url.url == ind["url"]).first()
            if url is None:
                url = Url(**ind)
                self.session.add(url)
                self.session.flush()
            entity_dict['url'] = url
            entity_dict['url_id'] = url.id
            entity = EntityUrl(**entity_dict)
            return entity
        
        #{'': lambda }
        entity =  { 
            'hashtags': process_hashtags,
            'user_mentions' : process_user_mentions,
            'urls' : process_urls
            }[ind_type]()
            
        logging.debug("Process_entity entity_dict: " + repr(entity_dict)) #@UndefinedVariable
        if entity:
            self.session.add(entity)
            self.session.flush()


    def __process_twitter_stream(self):
        
        tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
        if tweet_nb > 0:
            return
        
        ts_copy = adapt_fields(self.json_dict, fields_adapter["stream"]["tweet"])
        
        # get or create user
        user = self.__get_user(self.json_dict["user"])
        if user is None:
            logging.warning("USER not found " + repr(self.json_dict["user"])) #@UndefinedVariable
            ts_copy["user"] = None
            ts_copy["user_id"] = None
        else:
            ts_copy["user"] = user
            ts_copy["user_id"] = ts_copy["user"].id
            
        ts_copy["tweet_source_id"] = self.source_id
        
        self.tweet = Tweet(**ts_copy)
        self.session.add(self.tweet)
        self.session.flush()
            
        # get entities
        for ind_type, entity_list in self.json_dict["entities"].items():
            for ind in entity_list:
                self.__process_entity(ind, ind_type)


    def __process_twitter_rest(self):
        tweet_nb = self.session.query(Tweet).filter(Tweet.id == self.json_dict["id"]).count()
        if tweet_nb > 0:
            return
        
        
        tweet_fields = {
            'created_at': self.json_dict["created_at"], 
            'favorited': False,
            'id': self.json_dict["id"],
            'id_str': self.json_dict["id_str"],
            #'in_reply_to_screen_name': ts["to_user"], 
            'in_reply_to_user_id': self.json_dict["to_user_id"],
            'in_reply_to_user_id_str': self.json_dict["to_user_id_str"],
            #'place': ts["place"],
            'source': self.json_dict["source"],
            'text': self.json_dict["text"],
            'truncated': False,            
            'tweet_source_id' : self.source_id,
        }
        
        #user
    
        user_fields = {
            'lang' : self.json_dict.get('iso_language_code',None),
            'profile_image_url' : self.json_dict["profile_image_url"],
            'screen_name' : self.json_dict["from_user"],                   
        }
        
        user = self.__get_user(user_fields)
        if user is None:
            logging.warning("USER not found " + repr(user_fields)) #@UndefinedVariable
            tweet_fields["user"] = None
            tweet_fields["user_id"] = None
        else:
            tweet_fields["user"] = user
            tweet_fields["user_id"] = user.id
        
        tweet_fields = adapt_fields(tweet_fields, fields_adapter["rest"]["tweet"])
        self.tweet = Tweet(**tweet_fields)
        self.session.add(self.tweet)
        
        text = self.tweet.text
        
        extractor = twitter_text.Extractor(text)
        
        for ind in extractor.extract_hashtags_with_indices():
            self.__process_entity(ind, "hashtags")
            
        for ind in extractor.extract_mentioned_screen_names_with_indices():
            self.__process_entity(ind, "user_mentions")
        
        for ind in extractor.extract_urls_with_indices():
            self.__process_entity(ind, "urls")
        
        self.session.flush()


    def process(self):
        
        if self.source_id is None:
            tweet_source = TweetSource(original_json=self.json_txt);
            self.session.add(tweet_source)
            self.session.flush()
            self.source_id = tweet_source.id
        
        try:
            if "metadata" in self.json_dict:
                self.__process_twitter_rest()
            else:
                self.__process_twitter_stream()
                
            tweet_log = TweetLog(tweet_source_id = self.source_id, status = TweetLog.TWEET_STATUS['OK'])
        except:
            
            raise
        

def set_logging(options):
    
    logging_config = {}
    
    if options.logfile == "stdout":
        logging_config["stream"] = sys.stdout
    elif options.logfile == "stderr":
        logging_config["stream"] = sys.stderr
    else:
        logging_config["filename"] = options.logfile
        
    logging_config["level"] = max(logging.NOTSET, min(logging.CRITICAL, logging.WARNING - 10 * options.verbose + 10 * options.quiet)) #@UndefinedVariable
    logging.basicConfig(**logging_config) #@UndefinedVariable
    
    options.debug = (options.verbose-options.quiet > 0)

def set_logging_options(parser):
    parser.add_option("-l", "--log", dest="logfile",
                      help="log to file", metavar="LOG", default="stderr")
    parser.add_option("-v", dest="verbose", action="count",
                      help="verbose", metavar="VERBOSE", default=0)
    parser.add_option("-q", dest="quiet", action="count",
                      help="quiet", metavar="QUIET", default=0)

    
def get_filter_query(session, start_date, end_date, hashtags, tweet_exclude_table, user_whitelist):
    
    query = session.query(Tweet).join(EntityHashtag).join(Hashtag)
    if tweet_exclude_table is not None:
        query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable
        
    query = query.filter(Tweet.created_at >=  start_date).filter(Tweet.created_at <=  end_date)

    if user_whitelist:
        query = query.join(User).filter(User.screen_name.in_(user_whitelist))

    
    if hashtags :
        def merge_hash(l,h):
            l.extend(h.split(","))
            return l
        htags = reduce(merge_hash, hashtags, [])
        
        query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable
        
    
    return query
    

def get_user_query(session, start_date, end_date, hashtags, tweet_exclude_table):
    
    query = session.query(User).join(Tweet).join(EntityHashtag).join(Hashtag)
    if tweet_exclude_table is not None:
        query = query.filter(~Tweet.id.in_(select([tweet_exclude_table.c.id]))) #@UndefinedVariable
        
    query = query.filter(Tweet.created_at >=  start_date).filter(Tweet.created_at <=  end_date)
    
    if hashtags :
        def merge_hash(l,h):
            l.extend(h.split(","))
            return l
        htags = reduce(merge_hash, hashtags, [])
        
        query = query.filter(or_(*map(lambda h: Hashtag.text.contains(h), htags))) #@UndefinedVariable
    
    return query.distinct()