script/stream/recorder_tweetstream.py
author Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Mon, 20 Feb 2012 18:52:19 +0100
changeset 528 7fb5a7b0d35c
parent 464 b9243ade95e2
child 693 2ef837069108
permissions -rw-r--r--
remove reconnecting stream and propagate options

from getpass import getpass
from iri_tweet import models, utils
from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
    get_logger)
from optparse import OptionParser
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import scoped_session
import Queue
import StringIO
import anyjson
import datetime
import inspect
import logging
import os
import re
import shutil
import signal
import socket
import sqlalchemy.schema
import sys
import threading
import time
import traceback
import tweepy.auth
import tweetstream
import urllib2
socket._fileobject.default_bufsize = 0



#columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
#columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
#just put it in a sqlite3 tqble


def set_logging(options):
    loggers = []
    
    loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
    loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing')))
    if options.debug >= 2:
        loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine')))
    #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
    #utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
    #utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
    return loggers

def set_logging_process(options, queue):
    qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
    qlogger.propagate = 0
    return qlogger

def get_auth(options, access_token):
    if options.username and options.password:
        auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
    else:
        consumer_key = models.CONSUMER_KEY
        consumer_secret = models.CONSUMER_SECRET
        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
        auth.set_access_token(*access_token)
    return auth


def add_process_event(type, args, session_maker):
    session = session_maker()
    try:
        evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
        session.add(evt)
        session.commit()
    finally:
        session.close()


class BaseProcess(Process):

    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
        self.parent_pid = parent_pid
        self.session_maker = session_maker
        self.queue = queue
        self.options = options
        self.logger_queue = logger_queue
        self.stop_event = stop_event
        self.access_token = access_token

        super(BaseProcess, self).__init__()

    #
    # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
    #
    def parent_is_alive(self):
        try:
            # try to call Parent
            os.kill(self.parent_pid, 0)
        except OSError:
            # *beeep* oh no! The phone's disconnected!
            return False
        else:
            # *ring* Hi mom!
            return True
    

    def __get_process_event_args(self):
        return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}

    def run(self):
        try:
            add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
            self.do_run()
        finally:
            add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
        
    def do_run(self):
        raise NotImplementedError()



class SourceProcess(BaseProcess):
    
    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
        self.track = options.track
        self.token_filename = options.token_filename
        self.catchup = options.catchup
        self.timeout = options.timeout
        super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)

    def do_run(self):
        
        #import pydevd
        #pydevd.settrace(suspend=True)

        self.logger = set_logging_process(self.options, self.logger_queue)
        self.auth = get_auth(self.options, self.access_token) 
        
        self.logger.debug("SourceProcess : run ")
        track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
        self.logger.debug("SourceProcess : track list " + track_list)
        
        track_list = [k.strip() for k in track_list.split(',')]

        self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
        stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout)
        self.logger.debug("SourceProcess : after connecting to stream")
        stream.muststop = lambda: self.stop_event.is_set()
        
        session = self.session_maker()
        
        try:
            for tweet in stream:
                if not self.parent_is_alive():
                    sys.exit()
                self.logger.debug("SourceProcess : tweet " + repr(tweet))
                source = TweetSource(original_json=tweet)
                self.logger.debug("SourceProcess : source created")
                add_retries = 0
                while add_retries < 10:
                    try:
                        add_retries += 1
                        session.add(source)
                        session.flush()
                        break
                    except OperationalError as e:
                        session.rollback()
                        self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
                        if add_retries == 10:
                            raise e
                     
                source_id = source.id
                self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
                self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
                session.commit()
                self.queue.put((source_id, tweet), False)

        except Exception as e:
            self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
        finally:
            session.rollback()
            stream.close()
            session.close()
            self.queue.close()
            self.stop_event.set()


def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger):
    try:
        tweet_obj = anyjson.deserialize(tweet)
        if 'text' not in tweet_obj:
            tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
            session.add(tweet_log)
            return
        screen_name = ""
        if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
            screen_name = tweet_obj['user']['screen_name']
        logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
        logger.debug(u"Process_tweet :" + repr(tweet))
        processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user)
        processor.process()
    except Exception as e:
        message = u"Error %s processing tweet %s" % (repr(e), tweet)
        logger.exception(message)
        output = StringIO.StringIO()
        try:
            traceback.print_exc(file=output)
            error_stack = output.getvalue()
        finally:
            output.close()
        session.rollback()
        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
        session.add(tweet_log)
        session.commit()

    
        
class TweetProcess(BaseProcess):
    
    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
        super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
        self.twitter_query_user = options.twitter_query_user


    def do_run(self):
        
        self.logger = set_logging_process(self.options, self.logger_queue)
        session = self.session_maker()
        try:
            while not self.stop_event.is_set() and self.parent_is_alive():
                try:
                    source_id, tweet_txt = self.queue.get(True, 3)
                    self.logger.debug("Processing source id " + repr(source_id))
                except Exception as e:
                    self.logger.debug('Process tweet exception in loop : ' + repr(e))
                    continue
                process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger)
                session.commit()
        finally:
            session.rollback()
            self.stop_event.set()
            session.close()


def get_sessionmaker(conn_str):
    engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
    Session = scoped_session(Session)
    return Session, engine, metadata

            
def process_leftovers(session, access_token, twitter_query_user, logger):
    
    sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
    
    for src in sources:
        tweet_txt = src.original_json
        process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger)
        session.commit()

        
    
    #get tweet source that do not match any message
    #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
def process_log(logger_queues, stop_event):
    while not stop_event.is_set():
        for lqueue in logger_queues:
            try:
                record = lqueue.get_nowait()
                logging.getLogger(record.name).handle(record)
            except Queue.Empty:
                continue
            except IOError:
                continue
        time.sleep(0.1)

        
def get_options():

    usage = "usage: %prog [options]"

    parser = OptionParser(usage=usage)

    parser.add_option("-f", "--file", dest="conn_str",
                      help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
    parser.add_option("-u", "--user", dest="username",
                      help="Twitter user", metavar="USER", default=None)
    parser.add_option("-w", "--password", dest="password",
                      help="Twitter password", metavar="PASSWORD", default=None)
    parser.add_option("-T", "--track", dest="track",
                      help="Twitter track", metavar="TRACK")
    parser.add_option("-n", "--new", dest="new", action="store_true",
                      help="new database", default=False)
    parser.add_option("-D", "--daemon", dest="daemon", action="store_true",
                      help="launch daemon", default=False)
    parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
                      help="Token file name")
    parser.add_option("-d", "--duration", dest="duration",
                      help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
    parser.add_option("-N", "--nb-process", dest="process_nb",
                      help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
    parser.add_option("--url", dest="url",
                      help="The twitter url to connect to.", metavar="URL", default=tweetstream.FilterStream.url)
    parser.add_option("--query-user", dest="twitter_query_user", action="store_true",
                      help="Query twitter for users", default=False, metavar="QUERY_USER")
    parser.add_option("--catchup", dest="catchup",
                      help="catchup count for tweets", default=None, metavar="CATCHUP", type='int')
    parser.add_option("--timeout", dest="timeout",
                      help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int')
    



    utils.set_logging_options(parser)

    return parser.parse_args()


def do_run(options, session_maker):

    stop_args = {}

    access_token = None
    if not options.username or not options.password:
        access_token = utils.get_oauth_token(options.token_filename)
    
    session = session_maker()
    try:
        process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
        session.commit()
    finally:
        session.rollback()
        session.close()
    
    if options.process_nb <= 0:
        utils.get_logger().debug("Leftovers processed. Exiting.")
        return None

    queue = mQueue()
    stop_event = Event()
    
    #workaround for bug on using urllib2 and multiprocessing
    req = urllib2.Request('http://localhost')
    conn = None
    try:
        conn = urllib2.urlopen(req)
    except:
        utils.get_logger().debug("could not open localhost")
        #donothing
    finally:
        if conn is not None:
            conn.close()
    
    process_engines = []
    logger_queues = []
    
    SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
    process_engines.append(engine_process)
    lqueue = mQueue(1)
    logger_queues.append(lqueue)
    pid = os.getpid()
    sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
    
    tweet_processes = []
    
    for i in range(options.process_nb - 1):
        SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
        process_engines.append(engine_process)
        lqueue = mQueue(1)
        logger_queues.append(lqueue)
        cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
        tweet_processes.append(cprocess)

    def interupt_handler(signum, frame):
        utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
        stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
        stop_event.set()
        
    signal.signal(signal.SIGINT , interupt_handler)
    signal.signal(signal.SIGHUP , interupt_handler)
    signal.signal(signal.SIGALRM, interupt_handler)
    signal.signal(signal.SIGTERM, interupt_handler)

    log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
    log_thread.daemon = True

    log_thread.start()

    sprocess.start()
    for cprocess in tweet_processes:
        cprocess.start()

    add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)

    if options.duration >= 0:
        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
    

    while not stop_event.is_set():
        if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
            stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
            stop_event.set()
            break
        if sprocess.is_alive():            
            time.sleep(1)
        else:
            stop_args.update({'message': 'Source process killed'})
            stop_event.set()
            break
    utils.get_logger().debug("Joining Source Process")
    try:
        sprocess.join(10)
    except:
        utils.get_logger().debug("Pb joining Source Process - terminating")
        sprocess.terminate()
        
    for i, cprocess in enumerate(tweet_processes):
        utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
        try:
            cprocess.join(3)
        except:
            utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
            cprocess.terminate()

    
    utils.get_logger().debug("Close queues")
    try:
        queue.close()
        for lqueue in logger_queues:
            lqueue.close()
    except exception as e:
        utils.get_logger().error("error when closing queues %s", repr(e))
        #do nothing
        
    
    if options.process_nb > 1:
        utils.get_logger().debug("Processing leftovers")
        session = session_maker()
        try:
            process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
            session.commit()
        finally:
            session.rollback()
            session.close()

    for pengine in process_engines:
        pengine.dispose()

    return stop_args


def main(options, args):
    
    global conn_str
    
    conn_str = options.conn_str.strip()
    if not re.match("^\w+://.+", conn_str):
        conn_str = 'sqlite:///' + options.conn_str
        
    if conn_str.startswith("sqlite") and options.new:
        filepath = conn_str[conn_str.find(":///") + 4:]
        if os.path.exists(filepath):
            i = 1
            basename, extension = os.path.splitext(filepath)
            new_path = '%s.%d%s' % (basename, i, extension)
            while i < 1000000 and os.path.exists(new_path):
                i += 1
                new_path = '%s.%d%s' % (basename, i, extension)
            if i >= 1000000:
                raise Exception("Unable to find new filename for " + filepath)
            else:
                shutil.move(filepath, new_path)

    Session, engine, metadata = get_sessionmaker(conn_str)
    
    if options.new:
        check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
        if len(check_metadata.sorted_tables) > 0:
            message = "Database %s not empty exiting" % conn_str
            utils.get_logger().error(message)
            sys.exit(message)
    
    metadata.create_all(engine)
    session = Session()
    try:
        models.add_model_version(session)
    finally:
        session.close()
    
    stop_args = {}
    try:
        add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
        stop_args = do_run(options, Session)
    except Exception as e:
        utils.get_logger().exception("Error in main thread")        
        outfile = StringIO.StringIO()
        try:
            traceback.print_exc(file=outfile)
            stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
        finally:
            outfile.close()
        raise
    finally:    
        add_process_event(type="shutdown", args=stop_args, session_maker=Session)

    utils.get_logger().debug("Done. Exiting. " + repr(stop_args))



if __name__ == '__main__':

    (options, args) = get_options()
    
    loggers = set_logging(options)
    
    utils.get_logger().debug("OPTIONS : " + repr(options))
    
    if options.daemon:
        import daemon
        import lockfile
        
        hdlr_preserve = []
        for logger in loggers:
            hdlr_preserve.extend([h.stream for h in logger.handlers])
            
        context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) 
        with context:
            main(options, args)
    else:
        main(options, args)