script/stream/recorder_tweetstream.py
author Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Wed, 24 Aug 2011 18:04:26 +0200
changeset 260 b97a72ab59a2
parent 256 2f335337ff64
child 261 d84c4aa2a9eb
permissions -rw-r--r--
add more signals to shutdown

from getpass import getpass
from iri_tweet import models, utils
from iri_tweet.models import TweetSource, TweetLog
from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
    get_logger)
from optparse import OptionParser
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import scoped_session, sessionmaker
import Queue
import StringIO
import anyjson
import datetime
import logging
import os
import re
import shutil
import signal
import socket
import sqlalchemy.schema
import sys
import threading
import time
import traceback
import tweepy.auth
import tweetstream
import urllib2
#from iri_tweet.utils import get_logger
socket._fileobject.default_bufsize = 0



#columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
#columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
#just put it in a sqlite3 tqble


def set_logging(options):
    utils.set_logging(options, logging.getLogger('iri.tweet'))
    utils.set_logging(options, logging.getLogger('multiprocessing'))
    if options.debug >= 2:
        utils.set_logging(options, logging.getLogger('sqlalchemy.engine'))
    #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
    #utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
    #utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))

def set_logging_process(options, queue):
    qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
    qlogger.propagate = 0
    return qlogger

def get_auth(options, access_token):
    if options.username and options.password:
        auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
    else:
        consumer_key = models.CONSUMER_KEY
        consumer_secret = models.CONSUMER_SECRET
        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
        auth.set_access_token(*access_token)
    return auth


class ReconnectingTweetStream(tweetstream.FilterStream):
    """TweetStream class that automatically tries to reconnect if the
    connecting goes down. Reconnecting, and waiting for reconnecting, is
    blocking.

    :param username: See :TweetStream:

    :param password: See :TweetStream:

    :keyword url: See :TweetStream:

    :keyword reconnects: Number of reconnects before a ConnectionError is
        raised. Default is 3

    :error_cb: Optional callable that will be called just before trying to
        reconnect. The callback will be called with a single argument, the
        exception that caused the reconnect attempt. Default is None

    :retry_wait: Time to wait before reconnecting in seconds. Default is 5

    """

    def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs):
        self.max_reconnects = reconnects
        self.retry_wait = retry_wait
        self._reconnects = 0
        self._error_cb = error_cb
        super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs)

    def next(self):
        while True:
            try:
                utils.get_logger().debug("return super.next")
                return super(ReconnectingTweetStream, self).next()
            except tweetstream.ConnectionError, e:
                utils.get_logger().debug("connection error :" + str(e))
                self._reconnects += 1
                if self._reconnects > self.max_reconnects:
                    raise tweetstream.ConnectionError("Too many retries")

                # Note: error_cb is not called on the last error since we
                # raise a ConnectionError instead
                if  callable(self._error_cb):
                    self._error_cb(e)

                time.sleep(self.retry_wait)
        # Don't listen to auth error, since we can't reasonably reconnect
        # when we get one.




class SourceProcess(Process):
    
    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
        self.session_maker = session_maker
        self.queue = queue
        self.track = options.track
        self.reconnects = options.reconnects
        self.token_filename = options.token_filename
        self.stop_event = stop_event
        self.options = options
        self.access_token = access_token
        self.logger_queue = logger_queue
        super(SourceProcess, self).__init__()
    
    def run(self):
        
        #import pydevd
        #pydevd.settrace(suspend=False)

        self.logger = set_logging_process(self.options, self.logger_queue)
        self.auth = get_auth(self.options, self.access_token) 
        
        self.logger.debug("SourceProcess : run")
        track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
        track_list = [k for k in track_list.split(',')]

        self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
        stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
        self.logger.debug("SourceProcess : after connecting to stream")
        stream.muststop = lambda: self.stop_event.is_set()
        
        session = self.session_maker()
        
        try:
            for tweet in stream:
                self.logger.debug("tweet " + repr(tweet))
                source = TweetSource(original_json=tweet)
                self.logger.debug("source created")
                add_retries = 0
                while add_retries < 10:
                    try:
                        add_retries += 1
                        session.add(source)
                        session.flush()
                        break
                    except OperationalError as e:
                        session.rollback()
                        self.logger.debug("Operational Error %s nb %d" % (repr(e), add_retries))
                        if add_retries == 10:
                            raise e
                     
                source_id = source.id
                self.logger.debug("before queue + source id " + repr(source_id))
                self.logger.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
                session.commit()
                self.queue.put((source_id, tweet), False)

        except Exception as e:
            self.logger.error("Error when processing tweet " + repr(e))
        finally:
            session.rollback()
            stream.close()
            session.close()
            self.queue.close()
            self.stop_event.set()


def process_tweet(tweet, source_id, session, access_token, logger):
    try:
        tweet_obj = anyjson.deserialize(tweet)
        if 'text' not in tweet_obj:
            tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
            session.add(tweet_log)
            return
        screen_name = ""
        if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
            screen_name = tweet_obj['user']['screen_name']
        logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
        logger.debug(u"Process_tweet :" + repr(tweet))
        processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None)
        processor.process()
    except Exception as e:
        message = u"Error %s processing tweet %s" % (repr(e), tweet)
        logger.error(message)
        output = StringIO.StringIO()
        traceback.print_exception(Exception, e, None, None, output)
        error_stack = output.getvalue()
        output.close()
        session.rollback()
        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
        session.add(tweet_log)
        session.commit()

    
        
class TweetProcess(Process):
    
    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
        self.session_maker = session_maker
        self.queue = queue
        self.stop_event = stop_event
        self.options = options
        self.access_token = access_token
        self.logger_queue = logger_queue
        super(TweetProcess, self).__init__()


    def run(self):
        
        self.logger = set_logging_process(self.options, self.logger_queue)
        session = self.session_maker()
        try:
            while not self.stop_event.is_set():
                try:
                    source_id, tweet_txt = queue.get(True, 3)
                    self.logger.debug("Processing source id " + repr(source_id))
                except Exception as e:
                    self.logger.debug('Process tweet exception in loop : ' + repr(e))
                    continue
                process_tweet(tweet_txt, source_id, session, self.access_token, self.logger)
                session.commit()
        finally:
            session.rollback()
            self.stop_event.set()
            session.close()


def get_sessionmaker(conn_str):
    engine, metadata = models.setup_database(conn_str, echo=False, create_all=False)
    Session = scoped_session(sessionmaker(bind=engine, autocommit=False))
    return Session, engine, metadata

            
def process_leftovers(session, access_token, logger):
    
    sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
    
    for src in sources:
        tweet_txt = src.original_json
        process_tweet(tweet_txt, src.id, session, access_token, logger)
        session.commit()

        
    
    #get tweet source that do not match any message
    #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
def process_log(logger_queues, stop_event):
    while not stop_event.is_set():
        for lqueue in logger_queues:
            try:
                record = lqueue.get_nowait()
                logging.getLogger(record.name).handle(record)
            except Queue.Empty:
                continue
            except IOError:
                continue
        time.sleep(0.1)

        
def get_options():
    parser = OptionParser()
    parser.add_option("-f", "--file", dest="conn_str",
                      help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
    parser.add_option("-u", "--user", dest="username",
                      help="Twitter user", metavar="USER", default=None)
    parser.add_option("-w", "--password", dest="password",
                      help="Twitter password", metavar="PASSWORD", default=None)
    parser.add_option("-T", "--track", dest="track",
                      help="Twitter track", metavar="TRACK")
    parser.add_option("-n", "--new", dest="new", action="store_true",
                      help="new database", default=False)
    parser.add_option("-r", "--reconnects", dest="reconnects",
                      help="Reconnects", metavar="RECONNECTS", default=10, type='int')
    parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
                      help="Token file name")
    parser.add_option("-d", "--duration", dest="duration",
                      help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
    parser.add_option("-N", "--nb-process", dest="process_nb",
                      help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')



    utils.set_logging_options(parser)

    return parser.parse_args()
    

if __name__ == '__main__':

    (options, args) = get_options()
    
    set_logging(options)
        
    if options.debug:
        print "OPTIONS : "
        print repr(options)
    
    
    conn_str = options.conn_str.strip()
    if not re.match("^\w+://.+", conn_str):
        conn_str = 'sqlite:///' + options.conn_str
        
    if conn_str.startswith("sqlite") and options.new:
        filepath = conn_str[conn_str.find(":///")+4:]
        if os.path.exists(filepath):
            i = 1
            basename, extension = os.path.splitext(filepath)
            new_path = '%s.%d%s' % (basename, i, extension)
            while i < 1000000 and os.path.exists(new_path):
                i += 1
                new_path = '%s.%d%s' % (basename, i, extension)
            if i >= 1000000:
                raise Exception("Unable to find new filename for " + filepath)
            else:
                shutil.move(filepath, new_path)

    Session, engine, metadata = get_sessionmaker(conn_str)
    
    if options.new:
        check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
        if len(check_metadata.sorted_tables) > 0:
            message = "Database %s not empty exiting" % conn_str
            utils.get_logger().error(message)
            sys.exit(message)
    
    metadata.create_all(engine)
    
    access_token = None
    if not options.username or not options.password:
        access_token = utils.get_oauth_token(options.token_filename)
    
    session = Session()
    try:
        process_leftovers(session, access_token, utils.get_logger())
        session.commit()
    finally:
        session.rollback()
        session.close()
    
    if options.process_nb <= 0:
        utils.get_logger().debug("Leftovers processed. Exiting.")
        sys.exit()

    queue = mQueue()
    stop_event = Event()
    
    #workaround for bug on using urllib2 and multiprocessing
    req = urllib2.Request('http://localhost')
    conn = None
    try:
        conn = urllib2.urlopen(req)
    except:
        pass
        #donothing
    finally:
        if conn is not None:
            conn.close()
    
    process_engines = []
    logger_queues = []
    
    SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
    process_engines.append(engine_process)
    lqueue = mQueue(1)
    logger_queues.append(lqueue)
    sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)    
    
    tweet_processes = []
    
    for i in range(options.process_nb - 1):
        SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
        process_engines.append(engine_process)
        lqueue = mQueue(1)
        logger_queues.append(lqueue)
        cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)
        tweet_processes.append(cprocess)

    def interupt_handler(signum, frame):
        utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(frame))
        stop_event.set()
        
    signal.signal(signal.SIGINT , interupt_handler)
    signal.signal(signal.SIGHUP , interupt_handler)
    signal.signal(signal.SIGALRM, interupt_handler)
    signal.signal(signal.SIGTERM, interupt_handler)

    log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues,stop_event,))
    log_thread.daemon = True

    sprocess.start()
    for cprocess in tweet_processes:
        cprocess.start()

    log_thread.start()

    if options.duration >= 0:
        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
    

    while not stop_event.is_set():
        if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
            stop_event.set()
            break
        if sprocess.is_alive():            
            time.sleep(1)
        else:
            stop_event.set()
            break
    utils.get_logger().debug("Joining Source Process")
    try:
        sprocess.join(10)
    except:
        utils.get_logger().debug("Pb joining Source Process - terminating")
        sprocess.terminate()
        
    for i, cprocess in enumerate(tweet_processes):
        utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
        try:
            cprocess.join(3)
        except:
            utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
            cprocess.terminate()

    
    utils.get_logger().debug("Close queues")
    try:
        queue.close()
        for lqueue in logger_queues:
            lqueue.close()
    except exception as e:
        utils.get_logger().error("error when closing queues %s", repr(e))
        #do nothing
        
    
    if options.process_nb > 1:
        utils.get_logger().debug("Processing leftovers")
        session = Session()
        try:
            process_leftovers(session, access_token, utils.get_logger())
            session.commit()
        finally:
            session.rollback()
            session.close()

    for pengine in process_engines:
        pengine.dispose()
        
    utils.get_logger().debug("Done. Exiting.")