script/stream/recorder_stream.py
author ymh <ymh.work@gmail.com>
Fri, 15 Nov 2024 01:29:53 +0100
changeset 1575 ce1d5b0d1479
parent 1497 14a9bed2e3cd
permissions -rw-r--r--
Correct some details

import argparse
import datetime
import inspect
import json
import logging
import os
import queue
import re
import shutil
import signal
import socket
import sys
import threading
import time
import traceback
import urllib
from http.server import BaseHTTPRequestHandler, HTTPServer
from io import StringIO
from multiprocessing import Event, Process
from multiprocessing import Queue as mQueue

import requests_oauthlib
import sqlalchemy.schema
import twitter
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import scoped_session

import _thread
import iri_tweet.stream
from iri_tweet import models, utils
from iri_tweet.models import ProcessEvent, TweetLog, TweetSource
from iri_tweet.processor import get_processor


# columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
# columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
# just put it in a sqlite3 tqble

DEFAULT_TIMEOUT = 3

class Requesthandler(BaseHTTPRequestHandler):

    def do_GET(self):
        self.send_response(200)
        self.end_headers()

    def log_message(self, format, *args):        # @ReservedAssignment
        pass


def set_logging(options):
    loggers = []

    loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
    loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing')))
    if options.debug >= 2:
        loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine')))
    # utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
    # utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
    # utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
    return loggers

def set_logging_process(options, queue):
    qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
    qlogger.propagate = 0
    return qlogger

def get_auth(consumer_key, consumer_secret, token_key, token_secret):
    return requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=token_key, resource_owner_secret=token_secret, signature_type='auth_header')


def add_process_event(event_type, args, session_maker):
    session = session_maker()
    try:
        evt = ProcessEvent(args=None if args is None else json.dumps(args), type=event_type)
        session.add(evt)
        session.commit()
    finally:
        session.close()


class BaseProcess(Process):

    def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid):
        self.parent_pid = parent_pid
        self.session_maker = session_maker
        self.queue = queue
        self.options = options
        self.logger_queue = logger_queue
        self.stop_event = stop_event
        self.twitter_auth = twitter_auth

        super(BaseProcess, self).__init__()

    #
    # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
    #
    def parent_is_alive(self):
        try:
            # try to call Parent
            os.kill(self.parent_pid, 0)
        except OSError:
            # *beeep* oh no! The phone's disconnected!
            return False
        else:
            # *ring* Hi mom!
            return True


    def __get_process_event_args(self):
        return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__}

    def run(self):
        try:
            add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
            self.do_run()
        finally:
            add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)

    def do_run(self):
        raise NotImplementedError()



class SourceProcess(BaseProcess):

    def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid):
        self.track = options.track
        self.timeout = options.timeout
        self.stream = None
        super(SourceProcess, self).__init__(session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid)

    def __source_stream_iter(self):

        self.logger.debug("SourceProcess : run ")

        self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.twitter_auth))
        self.auth = get_auth(self.twitter_auth.consumer_key, self.twitter_auth.consumer_secret, self.twitter_auth.token, self.twitter_auth.token_secret)
        self.logger.debug("SourceProcess : auth set ")
        track_list = self.track  # or raw_input('Keywords to track (comma seperated): ').strip()
        self.logger.debug("SourceProcess : track list " + track_list)

        track_list = [k.strip() for k in track_list.split(',')]

        self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth)))
        self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, logger=self.logger)
        self.logger.debug("SourceProcess : after connecting to stream")
        self.stream.muststop = lambda: self.stop_event.is_set()

        stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger)

        session = self.session_maker()

        #import pydevd
        #pydevd.settrace(suspend=False)


        try:
            for tweet in stream_wrapper:
                if not self.parent_is_alive():
                    self.stop_event.set()
                    sys.exit()
                self.logger.debug("SourceProcess : tweet " + repr(tweet))
                source = TweetSource(original_json=tweet)
                self.logger.debug("SourceProcess : source created")
                add_retries = 0
                while add_retries < 10:
                    try:
                        add_retries += 1
                        session.add(source)
                        session.flush()
                        break
                    except OperationalError as e:
                        session.rollback()
                        self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
                        if add_retries == 10:
                            raise

                source_id = source.id
                self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
                self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
                session.commit()
                self.queue.put((source_id, tweet), False)

        except Exception as e:
            self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
            raise
        finally:
            session.rollback()
            session.close()
            self.stream.close()
            self.stream = None
            if not self.stop_event.is_set():
                self.stop_event.set()


    def do_run(self):

        self.logger = set_logging_process(self.options, self.logger_queue)

        source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")

        source_stream_iter_thread.start()

        try:
            while not self.stop_event.is_set():
                self.logger.debug("SourceProcess : In while after start")
                self.stop_event.wait(DEFAULT_TIMEOUT)
        except KeyboardInterrupt:
            self.stop_event.set()
            pass

        if self.stop_event.is_set() and self.stream:
            self.stream.close()
        elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
            self.stop_event.set()

        self.queue.cancel_join_thread()
        self.logger_queue.cancel_join_thread()
        self.logger.info("SourceProcess : join")
        source_stream_iter_thread.join(30)


def process_tweet(tweet, source_id, session, twitter_auth, twitter_query_user, logger):
    try:
        if not tweet.strip():
            return
        tweet_obj = json.loads(tweet)
        processor_klass = get_processor(tweet_obj)
        if not processor_klass:
            tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
            session.add(tweet_log)
            return
        processor = processor_klass(json_dict=tweet_obj,
                                    json_txt=tweet,
                                    source_id=source_id,
                                    session=session,
                                    twitter_auth=twitter_auth,
                                    user_query_twitter=twitter_query_user,
                                    logger=logger)
        logger.info(processor.log_info())
        logger.debug(u"Process_tweet :" + repr(tweet))
        processor.process()

    except ValueError as e:
        message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
        output = StringIO()
        try:
            traceback.print_exc(file=output)
            error_stack = output.getvalue()
        finally:
            output.close()
        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
        session.add(tweet_log)
        session.commit()
    except Exception as e:
        message = u"Error %s processing tweet %s" % (repr(e), tweet)
        logger.exception(message)
        output = StringIO()
        try:
            traceback.print_exc(file=output)
            error_stack = output.getvalue()
        finally:
            output.close()
        session.rollback()
        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
        session.add(tweet_log)
        session.commit()



class TweetProcess(BaseProcess):

    def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid):
        super(TweetProcess, self).__init__(session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid)
        self.twitter_query_user = options.twitter_query_user


    def do_run(self):

        self.logger = set_logging_process(self.options, self.logger_queue)
        session = self.session_maker()
        try:
            while not self.stop_event.is_set() and self.parent_is_alive():
                try:
                    source_id, tweet_txt = self.queue.get(True, 3)
                    self.logger.debug("Processing source id " + repr(source_id))
                except Exception as e:
                    self.logger.debug('Process tweet exception in loop : ' + repr(e))
                    continue
                process_tweet(tweet_txt, source_id, session, self.twitter_auth, self.twitter_query_user, self.logger)
                session.commit()
        except KeyboardInterrupt:
            self.stop_event.set()
        finally:
            session.rollback()
            session.close()


def get_sessionmaker(conn_str):
    engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
    Session = scoped_session(Session)
    return Session, engine, metadata


def process_leftovers(session, twitter_auth, twitter_query_user, ask_process_leftovers, logger):

    sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
    sources_count = sources.count()

    if sources_count > 10 and ask_process_leftovers:
        resp = input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count)
        if resp and resp.strip().lower() == "n":
            return
    logger.info("Process leftovers, %d tweets to process" % (sources_count))
    for src in sources:
        tweet_txt = src.original_json
        process_tweet(tweet_txt, src.id, session, twitter_auth, twitter_query_user, logger)
        session.commit()


def process_log(logger_queues, stop_event):
    while not stop_event.is_set():
        for lqueue in logger_queues:
            try:
                record = lqueue.get_nowait()
                logging.getLogger(record.name).handle(record)
            except queue.Empty:
                continue
            except IOError:
                continue
        time.sleep(0.1)


def get_options():

    usage = "usage: %(prog)s [options]"

    parser = argparse.ArgumentParser(usage=usage)

    parser.add_argument("-f", "--file", dest="conn_str",
                        help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
    parser.add_argument("-T", "--track", dest="track",
                        help="Twitter track", metavar="TRACK")
    parser.add_argument("-k", "--key", dest="consumer_key",
                        help="Twitter consumer key", metavar="CONSUMER_KEY", required=True)
    parser.add_argument("-s", "--secret", dest="consumer_secret",
                        help="Twitter consumer secret", metavar="CONSUMER_SECRET", required=True)
    parser.add_argument("-n", "--new", dest="new", action="store_true",
                        help="new database", default=False)
    parser.add_argument("-D", "--daemon", dest="daemon", action="store_true",
                        help="launch daemon", default=False)
    parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
                        help="Token file name")
    parser.add_argument("-d", "--duration", dest="duration",
                        help="Duration of recording in seconds", metavar="DURATION", default= -1, type=int)
    parser.add_argument("-N", "--nb-process", dest="process_nb",
                        help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type=int)
    parser.add_argument("--url", dest="url",
                        help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url)
    parser.add_argument("--query-user", dest="twitter_query_user", action="store_true",
                        help="Query twitter for users", default=False)
    parser.add_argument("--timeout", dest="timeout",
                        help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type=int)
    parser.add_argument("--ask-process-leftovers", dest="ask_process_leftovers", action="store_false",
                        help="ask process leftover", default=True)


    utils.set_logging_options(parser)

    return parser.parse_args()


def do_run(options, session_maker):

    stop_args = {}


    access_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename)
    twitter_auth = twitter.OAuth(access_token_key, access_token_secret, options.consumer_key, options.consumer_secret)

    session = session_maker()
    try:
        process_leftovers(session, twitter_auth, options.twitter_query_user, options.ask_process_leftovers, utils.get_logger())
        session.commit()
    finally:
        session.rollback()
        session.close()

    if options.process_nb <= 0:
        utils.get_logger().debug("Leftovers processed. Exiting.")
        return None

    queue = mQueue()
    stop_event = Event()

    # workaround for bug on using urllib2 and multiprocessing
    httpd = HTTPServer(('127.0.0.1',0), Requesthandler)
    _thread.start_new_thread(httpd.handle_request, ())

    req = urllib.request.Request('http://localhost:%d' % httpd.server_port)
    conn = None
    try:
        conn = urllib.request.urlopen(req)
    except:
        utils.get_logger().debug("could not open localhost")
        # donothing
    finally:
        if conn is not None:
            conn.close()

    process_engines = []
    logger_queues = []

    SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
    process_engines.append(engine_process)
    lqueue = mQueue(50)
    logger_queues.append(lqueue)
    pid = os.getpid()
    sprocess = SourceProcess(SessionProcess, queue, options, twitter_auth, stop_event, lqueue, pid)

    tweet_processes = []

    for i in range(options.process_nb - 1):
        SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
        process_engines.append(engine_process)
        lqueue = mQueue(50)
        logger_queues.append(lqueue)
        cprocess = TweetProcess(SessionProcess, queue, options, twitter_auth, stop_event, lqueue, pid)
        tweet_processes.append(cprocess)

    log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
    log_thread.daemon = True

    log_thread.start()

    sprocess.start()
    for cprocess in tweet_processes:
        cprocess.start()

    add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)

    if options.duration >= 0:
        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)

    def interupt_handler(signum, frame):
        utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
        stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
        stop_event.set()

    signal.signal(signal.SIGINT , interupt_handler)
    signal.signal(signal.SIGHUP , interupt_handler)
    signal.signal(signal.SIGALRM, interupt_handler)
    signal.signal(signal.SIGTERM, interupt_handler)


    while not stop_event.is_set():
        if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
            stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
            stop_event.set()
            break
        if sprocess.is_alive():
            utils.get_logger().debug("Source process alive")
            time.sleep(1)
        else:
            stop_args.update({'message': 'Source process killed'})
            stop_event.set()
            break
    utils.get_logger().debug("Joining Source Process")
    try:
        sprocess.join(10)
    except:
        utils.get_logger().debug("Pb joining Source Process - terminating")
    finally:
        sprocess.terminate()

    for i, cprocess in enumerate(tweet_processes):
        utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
        try:
            cprocess.join(3)
        except:
            utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
            cprocess.terminate()


    utils.get_logger().debug("Close queues")
    try:
        queue.close()
        for lqueue in logger_queues:
            lqueue.close()
    except Exception as e:
        utils.get_logger().error("error when closing queues %s", repr(e))
        # do nothing


    if options.process_nb > 1:
        utils.get_logger().debug("Processing leftovers")
        session = session_maker()
        try:
            process_leftovers(session, twitter_auth, options.twitter_query_user, options.ask_process_leftovers, utils.get_logger())
            session.commit()
        finally:
            session.rollback()
            session.close()

    for pengine in process_engines:
        pengine.dispose()

    return stop_args


def main(options):

    global conn_str

    conn_str = options.conn_str.strip()
    if not re.match(r"^\w+://.+", conn_str):
        conn_str = 'sqlite:///' + options.conn_str

    if conn_str.startswith("sqlite") and options.new:
        filepath = conn_str[conn_str.find(":///") + 4:]
        if os.path.exists(filepath):
            i = 1
            basename, extension = os.path.splitext(filepath)
            new_path = '%s.%d%s' % (basename, i, extension)
            while i < 1000000 and os.path.exists(new_path):
                i += 1
                new_path = '%s.%d%s' % (basename, i, extension)
            if i >= 1000000:
                raise Exception("Unable to find new filename for " + filepath)
            else:
                shutil.move(filepath, new_path)

    Session, engine, metadata = get_sessionmaker(conn_str)

    if options.new:
        check_metadata = sqlalchemy.schema.MetaData(bind=engine)
        check_metadata.reflect()
        if len(check_metadata.sorted_tables) > 0:
            message = "Database %s not empty exiting" % conn_str
            utils.get_logger().error(message)
            sys.exit(message)

    metadata.create_all(engine)
    session = Session()
    try:
        models.add_model_version(session)
    finally:
        session.close()

    stop_args = {}
    try:
        add_process_event(event_type="start", args={'options':options.__dict__, 'args': [], 'command_line': sys.argv}, session_maker=Session)
        stop_args = do_run(options, Session)
    except Exception as e:
        utils.get_logger().exception("Error in main thread")
        outfile = StringIO()
        try:
            traceback.print_exc(file=outfile)
            stop_args = {'error': repr(e), 'message': repr(e), 'stacktrace':outfile.getvalue()}
        finally:
            outfile.close()
        raise
    finally:
        add_process_event(event_type="shutdown", args=stop_args, session_maker=Session)

    utils.get_logger().debug("Done. Exiting. " + repr(stop_args))



if __name__ == '__main__':

    options = get_options()

    loggers = set_logging(options)

    utils.get_logger().debug("OPTIONS : " + repr(options))

    if options.daemon:
        options.ask_process_leftovers = False
        import daemon

        hdlr_preserve = []
        for logger in loggers:
            hdlr_preserve.extend([h.stream for h in logger.handlers])

        context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve)
        with context:
            main(options)
    else:
        main(options)