script/stream/recorder_tweetstream.py
author Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Tue, 11 Dec 2012 10:46:35 +0100
changeset 739 350ffcb7ae4d
parent 738 2497c7f38e0a
permissions -rw-r--r--
correct listener. add log

from getpass import getpass
from iri_tweet import models, utils
from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
    get_logger)
from optparse import OptionParser
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import scoped_session
import Queue
import StringIO
import anyjson
import datetime
import inspect
import logging
import os
import re
import shutil
import signal
import socket
import sqlalchemy.schema
import sys
import threading
import time
import traceback
import tweepy.auth
import iri_tweet.stream as tweetstream
import urllib2
socket._fileobject.default_bufsize = 0



#columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
#columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
#just put it in a sqlite3 tqble

DEFAULT_TIMEOUT = 5

def set_logging(options):
    loggers = []
    
    loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
    loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing')))
    if options.debug >= 2:
        loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine')))
    #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
    #utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
    #utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
    return loggers

def set_logging_process(options, queue):
    qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
    qlogger.propagate = 0
    return qlogger

def get_auth(options, access_token):
    if options.username and options.password:
        auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
    else:
        consumer_key = models.CONSUMER_KEY
        consumer_secret = models.CONSUMER_SECRET
        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=True)
        auth.set_access_token(*access_token)
    return auth


def add_process_event(type, args, session_maker):
    session = session_maker()
    try:
        evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
        session.add(evt)
        session.commit()
    finally:
        session.close()


class BaseProcess(Process):

    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
        self.parent_pid = parent_pid
        self.session_maker = session_maker
        self.queue = queue
        self.options = options
        self.logger_queue = logger_queue
        self.stop_event = stop_event
        self.access_token = access_token

        super(BaseProcess, self).__init__()

    #
    # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
    #
    def parent_is_alive(self):
        try:
            # try to call Parent
            os.kill(self.parent_pid, 0)
        except OSError:
            # *beeep* oh no! The phone's disconnected!
            return False
        else:
            # *ring* Hi mom!
            return True
    

    def __get_process_event_args(self):
        return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}

    def run(self):
        try:
            add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
            self.do_run()
        finally:
            add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
        
    def do_run(self):
        raise NotImplementedError()



class SourceProcess(BaseProcess):
    
    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
        self.track = options.track
        self.token_filename = options.token_filename
        self.catchup = options.catchup
        self.timeout = options.timeout
        self.stream = None
        super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
                    
    def __source_stream_iter(self):
        
        self.logger = set_logging_process(self.options, self.logger_queue)
        self.logger.debug("SourceProcess : run ")
        
        self.auth = get_auth(self.options, self.access_token) 
        self.logger.debug("SourceProcess : auth set ")
        
        track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
        self.logger.debug("SourceProcess : track list " + track_list)
        
        track_list = [k.strip() for k in track_list.split(',')]

        self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
        self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger)
        self.logger.debug("SourceProcess : after connecting to stream")
        self.stream.muststop = lambda: self.stop_event.is_set()        
        
        stream_wrapper = tweetstream.SafeStreamWrapper(self.stream, logger=self.logger)
        
        session = self.session_maker()
        
        try:
            for tweet in stream_wrapper:
                if not self.parent_is_alive():
                    self.stop_event.set()
                    stop_thread.join(5)
                    sys.exit()
                self.logger.debug("SourceProcess : tweet " + repr(tweet))
                source = TweetSource(original_json=tweet)
                self.logger.debug("SourceProcess : source created")
                add_retries = 0
                while add_retries < 10:
                    try:
                        add_retries += 1
                        session.add(source)
                        session.flush()
                        break
                    except OperationalError as e:
                        session.rollback()
                        self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
                        if add_retries == 10:
                            raise
                     
                source_id = source.id
                self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
                self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
                session.commit()
                self.queue.put((source_id, tweet), False)

        except Exception as e:
            self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
            raise
        finally:
            session.rollback()
            session.close()
            self.logger_queue.close()
            self.queue.close()
            self.stream.close()
            self.stream = None
            if not self.stop_event.is_set():
                self.stop_event.set()


    def do_run(self):
        
        #import pydevd
        #pydevd.settrace(suspend=False)
        
        source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
        
        source_stream_iter_thread.start()
        
        while not self.stop_event.is_set():
            self.logger.debug("SourceProcess : In while after start")
            self.stop_event.wait(DEFAULT_TIMEOUT)
            if self.stop_event.is_set() and self.stream:
                self.stream.close()
            elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
                self.stop_event.set()

        self.logger.info("SourceProcess : join")
        source_stream_iter_thread.join(30)


def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger):
    try:
        if not tweet.strip():
            return
        tweet_obj = anyjson.deserialize(tweet)
        if 'text' not in tweet_obj:
            tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
            session.add(tweet_log)
            return
        screen_name = ""
        if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
            screen_name = tweet_obj['user']['screen_name']
        logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
        logger.debug(u"Process_tweet :" + repr(tweet))
        processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user)
        processor.process()
    except ValueError as e:
        message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
        output = StringIO.StringIO()
        try:
            traceback.print_exc(file=output)
            error_stack = output.getvalue()
        finally:
            output.close()
        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
        session.add(tweet_log)
        session.commit()        
    except Exception as e:
        message = u"Error %s processing tweet %s" % (repr(e), tweet)
        logger.exception(message)
        output = StringIO.StringIO()
        try:
            traceback.print_exc(file=output)
            error_stack = output.getvalue()
        finally:
            output.close()
        session.rollback()
        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
        session.add(tweet_log)
        session.commit()

    
        
class TweetProcess(BaseProcess):
    
    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
        super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
        self.twitter_query_user = options.twitter_query_user


    def do_run(self):
        
        self.logger = set_logging_process(self.options, self.logger_queue)
        session = self.session_maker()
        try:
            while not self.stop_event.is_set() and self.parent_is_alive():
                try:
                    source_id, tweet_txt = self.queue.get(True, 3)
                    self.logger.debug("Processing source id " + repr(source_id))
                except Exception as e:
                    self.logger.debug('Process tweet exception in loop : ' + repr(e))
                    continue
                process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger)
                session.commit()
        finally:
            session.rollback()
            session.close()


def get_sessionmaker(conn_str):
    engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
    Session = scoped_session(Session)
    return Session, engine, metadata

            
def process_leftovers(session, access_token, twitter_query_user, logger):
    
    sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
    
    for src in sources:
        tweet_txt = src.original_json
        process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger)
        session.commit()

        
    
    #get tweet source that do not match any message
    #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
def process_log(logger_queues, stop_event):
    while not stop_event.is_set():
        for lqueue in logger_queues:
            try:
                record = lqueue.get_nowait()
                logging.getLogger(record.name).handle(record)
            except Queue.Empty:
                continue
            except IOError:
                continue
        time.sleep(0.1)

        
def get_options():

    usage = "usage: %prog [options]"

    parser = OptionParser(usage=usage)

    parser.add_option("-f", "--file", dest="conn_str",
                      help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
    parser.add_option("-u", "--user", dest="username",
                      help="Twitter user", metavar="USER", default=None)
    parser.add_option("-w", "--password", dest="password",
                      help="Twitter password", metavar="PASSWORD", default=None)
    parser.add_option("-T", "--track", dest="track",
                      help="Twitter track", metavar="TRACK")
    parser.add_option("-n", "--new", dest="new", action="store_true",
                      help="new database", default=False)
    parser.add_option("-D", "--daemon", dest="daemon", action="store_true",
                      help="launch daemon", default=False)
    parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
                      help="Token file name")
    parser.add_option("-d", "--duration", dest="duration",
                      help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
    parser.add_option("-N", "--nb-process", dest="process_nb",
                      help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
    parser.add_option("--url", dest="url",
                      help="The twitter url to connect to.", metavar="URL", default=tweetstream.FilterStream.url)
    parser.add_option("--query-user", dest="twitter_query_user", action="store_true",
                      help="Query twitter for users", default=False, metavar="QUERY_USER")
    parser.add_option("--catchup", dest="catchup",
                      help="catchup count for tweets", default=None, metavar="CATCHUP", type='int')
    parser.add_option("--timeout", dest="timeout",
                      help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int')
    



    utils.set_logging_options(parser)

    return parser.parse_args()


def do_run(options, session_maker):

    stop_args = {}

    access_token = None
    if not options.username or not options.password:
        access_token = utils.get_oauth_token(options.token_filename)
    
    session = session_maker()
    try:
        process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
        session.commit()
    finally:
        session.rollback()
        session.close()
    
    if options.process_nb <= 0:
        utils.get_logger().debug("Leftovers processed. Exiting.")
        return None

    queue = mQueue()
    stop_event = Event()
    
    #workaround for bug on using urllib2 and multiprocessing
    req = urllib2.Request('http://localhost')
    conn = None
    try:
        conn = urllib2.urlopen(req)
    except:
        utils.get_logger().debug("could not open localhost")
        #donothing
    finally:
        if conn is not None:
            conn.close()
    
    process_engines = []
    logger_queues = []
    
    SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
    process_engines.append(engine_process)
    lqueue = mQueue(50)
    logger_queues.append(lqueue)
    pid = os.getpid()
    sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
    
    tweet_processes = []
    
    for i in range(options.process_nb - 1):
        SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
        process_engines.append(engine_process)
        lqueue = mQueue(50)
        logger_queues.append(lqueue)
        cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
        tweet_processes.append(cprocess)

    log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
    log_thread.daemon = True

    log_thread.start()

    sprocess.start()
    for cprocess in tweet_processes:
        cprocess.start()

    add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)

    if options.duration >= 0:
        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    

    def interupt_handler(signum, frame):
        utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
        stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
        stop_event.set()
        
    signal.signal(signal.SIGINT , interupt_handler)
    signal.signal(signal.SIGHUP , interupt_handler)
    signal.signal(signal.SIGALRM, interupt_handler)
    signal.signal(signal.SIGTERM, interupt_handler)
    

    while not stop_event.is_set():
        if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
            stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
            stop_event.set()
            break
        if sprocess.is_alive():
            utils.get_logger().debug("Source process alive")
            time.sleep(1)
        else:
            stop_args.update({'message': 'Source process killed'})
            stop_event.set()
            break
    utils.get_logger().debug("Joining Source Process")
    try:
        sprocess.join(10)
    except:
        utils.get_logger().debug("Pb joining Source Process - terminating")
        sprocess.terminate()
        
    for i, cprocess in enumerate(tweet_processes):
        utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
        try:
            cprocess.join(3)
        except:
            utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
            cprocess.terminate()

    
    utils.get_logger().debug("Close queues")
    try:
        queue.close()
        for lqueue in logger_queues:
            lqueue.close()
    except exception as e:
        utils.get_logger().error("error when closing queues %s", repr(e))
        #do nothing
        
    
    if options.process_nb > 1:
        utils.get_logger().debug("Processing leftovers")
        session = session_maker()
        try:
            process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
            session.commit()
        finally:
            session.rollback()
            session.close()

    for pengine in process_engines:
        pengine.dispose()

    return stop_args


def main(options, args):
    
    global conn_str
    
    conn_str = options.conn_str.strip()
    if not re.match("^\w+://.+", conn_str):
        conn_str = 'sqlite:///' + options.conn_str
        
    if conn_str.startswith("sqlite") and options.new:
        filepath = conn_str[conn_str.find(":///") + 4:]
        if os.path.exists(filepath):
            i = 1
            basename, extension = os.path.splitext(filepath)
            new_path = '%s.%d%s' % (basename, i, extension)
            while i < 1000000 and os.path.exists(new_path):
                i += 1
                new_path = '%s.%d%s' % (basename, i, extension)
            if i >= 1000000:
                raise Exception("Unable to find new filename for " + filepath)
            else:
                shutil.move(filepath, new_path)

    Session, engine, metadata = get_sessionmaker(conn_str)
    
    if options.new:
        check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
        if len(check_metadata.sorted_tables) > 0:
            message = "Database %s not empty exiting" % conn_str
            utils.get_logger().error(message)
            sys.exit(message)
    
    metadata.create_all(engine)
    session = Session()
    try:
        models.add_model_version(session)
    finally:
        session.close()
    
    stop_args = {}
    try:
        add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
        stop_args = do_run(options, Session)
    except Exception as e:
        utils.get_logger().exception("Error in main thread")        
        outfile = StringIO.StringIO()
        try:
            traceback.print_exc(file=outfile)
            stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
        finally:
            outfile.close()
        raise
    finally:    
        add_process_event(type="shutdown", args=stop_args, session_maker=Session)

    utils.get_logger().debug("Done. Exiting. " + repr(stop_args))



if __name__ == '__main__':

    (options, args) = get_options()
    
    loggers = set_logging(options)
    
    utils.get_logger().debug("OPTIONS : " + repr(options))
    
    if options.daemon:
        import daemon
        import lockfile
        
        hdlr_preserve = []
        for logger in loggers:
            hdlr_preserve.extend([h.stream for h in logger.handlers])
            
        context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) 
        with context:
            main(options, args)
    else:
        main(options, args)