script/stream/recorder_stream.py
changeset 893 10a19dd4e1c9
parent 528 7fb5a7b0d35c
parent 890 9c57883dbb9d
child 919 e126d3e1e186
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/script/stream/recorder_stream.py	Fri May 10 13:34:40 2013 +0200
@@ -0,0 +1,603 @@
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+from iri_tweet import models, utils
+from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
+from iri_tweet.processor import get_processor
+from multiprocessing import Queue as mQueue, Process, Event
+from sqlalchemy.exc import OperationalError
+from sqlalchemy.orm import scoped_session
+import Queue
+import StringIO
+import anyjson
+import argparse
+import datetime
+import inspect
+import iri_tweet.stream
+import logging
+import os
+import re
+import requests_oauthlib
+import shutil
+import signal
+import socket
+import sqlalchemy.schema
+import sys
+import thread
+import threading
+import time
+import traceback
+import urllib2
+socket._fileobject.default_bufsize = 0
+
+
+
+# columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
+columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
+# columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
+columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
+# just put it in a sqlite3 tqble
+
+DEFAULT_TIMEOUT = 3
+
+class Requesthandler(BaseHTTPRequestHandler):
+
+    def __init__(self, request, client_address, server):
+        BaseHTTPRequestHandler.__init__(self, request, client_address, server)
+        
+    def do_GET(self):
+        self.send_response(200)
+        self.end_headers()
+    
+    def log_message(self, format, *args):        # @ReservedAssignment
+        pass
+
+
+def set_logging(options):
+    loggers = []
+    
+    loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
+    loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing')))
+    if options.debug >= 2:
+        loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine')))
+    # utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
+    # utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
+    # utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
+    return loggers
+
+def set_logging_process(options, queue):
+    qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
+    qlogger.propagate = 0
+    return qlogger
+
+def get_auth(options, access_token):
+    consumer_key = options.consumer_key
+    consumer_secret = options.consumer_secret
+    auth = requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=access_token[0], resource_owner_secret=access_token[1], signature_type='auth_header')
+    return auth
+
+
+def add_process_event(event_type, args, session_maker):
+    session = session_maker()
+    try:
+        evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=event_type)
+        session.add(evt)
+        session.commit()
+    finally:
+        session.close()
+
+
+class BaseProcess(Process):
+
+    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
+        self.parent_pid = parent_pid
+        self.session_maker = session_maker
+        self.queue = queue
+        self.options = options
+        self.logger_queue = logger_queue
+        self.stop_event = stop_event
+        self.consumer_token = (options.consumer_key, options.consumer_secret)
+        self.access_token = access_token
+
+        super(BaseProcess, self).__init__()
+
+    #
+    # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
+    #
+    def parent_is_alive(self):
+        try:
+            # try to call Parent
+            os.kill(self.parent_pid, 0)
+        except OSError:
+            # *beeep* oh no! The phone's disconnected!
+            return False
+        else:
+            # *ring* Hi mom!
+            return True
+    
+
+    def __get_process_event_args(self):
+        return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
+
+    def run(self):
+        try:
+            add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
+            self.do_run()
+        finally:
+            add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
+        
+    def do_run(self):
+        raise NotImplementedError()
+
+
+
+class SourceProcess(BaseProcess):
+    
+    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
+        self.track = options.track
+        self.token_filename = options.token_filename
+        self.timeout = options.timeout
+        self.stream = None
+        super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
+                    
+    def __source_stream_iter(self):
+                
+        self.logger.debug("SourceProcess : run ")
+        
+        self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.access_token))
+        self.auth = get_auth(self.options, self.access_token) 
+        self.logger.debug("SourceProcess : auth set ")
+        
+        track_list = self.track  # or raw_input('Keywords to track (comma seperated): ').strip()
+        self.logger.debug("SourceProcess : track list " + track_list)
+        
+        track_list = [k.strip() for k in track_list.split(',')]
+
+        self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth)))                        
+        self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, chunk_size=512, logger=self.logger)
+        self.logger.debug("SourceProcess : after connecting to stream")
+        self.stream.muststop = lambda: self.stop_event.is_set()        
+        
+        stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger)
+        
+        session = self.session_maker()
+        
+        #import pydevd
+        #pydevd.settrace(suspend=False)
+
+        
+        try:
+            for tweet in stream_wrapper:
+                if not self.parent_is_alive():
+                    self.stop_event.set()
+                    sys.exit()
+                self.logger.debug("SourceProcess : tweet " + repr(tweet))
+                source = TweetSource(original_json=tweet)
+                self.logger.debug("SourceProcess : source created")
+                add_retries = 0
+                while add_retries < 10:
+                    try:
+                        add_retries += 1
+                        session.add(source)
+                        session.flush()
+                        break
+                    except OperationalError as e:
+                        session.rollback()
+                        self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
+                        if add_retries == 10:
+                            raise
+                     
+                source_id = source.id
+                self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
+                self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
+                session.commit()
+                self.queue.put((source_id, tweet), False)
+
+        except Exception as e:
+            self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
+            raise
+        finally:
+            session.rollback()
+            session.close()
+            self.stream.close()
+            self.stream = None
+            if not self.stop_event.is_set():
+                self.stop_event.set()
+
+
+    def do_run(self):
+        
+        self.logger = set_logging_process(self.options, self.logger_queue)                
+        
+        source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
+        
+        source_stream_iter_thread.start()
+        
+        try:
+            while not self.stop_event.is_set():
+                self.logger.debug("SourceProcess : In while after start")
+                self.stop_event.wait(DEFAULT_TIMEOUT)
+        except KeyboardInterrupt:
+            self.stop_event.set()
+            pass
+
+        if self.stop_event.is_set() and self.stream:
+            self.stream.close()
+        elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
+            self.stop_event.set()
+
+        self.queue.cancel_join_thread()
+        self.logger_queue.cancel_join_thread()
+        self.logger.info("SourceProcess : join")
+        source_stream_iter_thread.join(30)
+
+
+def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger):
+    try:
+        if not tweet.strip():
+            return
+        tweet_obj = anyjson.deserialize(tweet)
+        processor_klass = get_processor(tweet_obj)
+        if not processor_klass:
+            tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
+            session.add(tweet_log)
+            return
+        processor = processor_klass(json_dict=tweet_obj,
+                                    json_txt=tweet,
+                                    source_id=source_id,
+                                    session=session,
+                                    consumer_token=consumer_token,
+                                    access_token=access_token,
+                                    token_filename=token_filename,
+                                    user_query_twitter=twitter_query_user,
+                                    logger=logger)
+        logger.info(processor.log_info())                        
+        logger.debug(u"Process_tweet :" + repr(tweet))                
+        processor.process()
+        
+    except ValueError as e:
+        message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
+        output = StringIO.StringIO()
+        try:
+            traceback.print_exc(file=output)
+            error_stack = output.getvalue()
+        finally:
+            output.close()
+        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
+        session.add(tweet_log)
+        session.commit()        
+    except Exception as e:
+        message = u"Error %s processing tweet %s" % (repr(e), tweet)
+        logger.exception(message)
+        output = StringIO.StringIO()
+        try:
+            traceback.print_exc(file=output)
+            error_stack = output.getvalue()
+        finally:
+            output.close()
+        session.rollback()
+        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
+        session.add(tweet_log)
+        session.commit()
+
+    
+        
+class TweetProcess(BaseProcess):
+    
+    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
+        super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
+        self.twitter_query_user = options.twitter_query_user
+
+
+    def do_run(self):
+        
+        self.logger = set_logging_process(self.options, self.logger_queue)
+        session = self.session_maker()
+        try:
+            while not self.stop_event.is_set() and self.parent_is_alive():
+                try:
+                    source_id, tweet_txt = self.queue.get(True, 3)
+                    self.logger.debug("Processing source id " + repr(source_id))
+                except Exception as e:
+                    self.logger.debug('Process tweet exception in loop : ' + repr(e))
+                    continue
+                process_tweet(tweet_txt, source_id, session, self.consumer_token, self.access_token, self.twitter_query_user, self.options.token_filename, self.logger)
+                session.commit()
+        except KeyboardInterrupt:
+            self.stop_event.set()
+        finally:
+            session.rollback()
+            session.close()
+
+
+def get_sessionmaker(conn_str):
+    engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
+    Session = scoped_session(Session)
+    return Session, engine, metadata
+
+            
+def process_leftovers(session, consumer_token, access_token, twitter_query_user, token_filename, ask_process_leftovers, logger):
+    
+    sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
+    sources_count = sources.count()
+    
+    if sources_count > 10 and ask_process_leftovers:
+        resp = raw_input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count)
+        if resp and resp.strip().lower() == "n":
+            return
+    logger.info("Process leftovers, %d tweets to process" % (sources_count))
+    for src in sources:
+        tweet_txt = src.original_json
+        process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger)
+        session.commit()
+        
+    
+def process_log(logger_queues, stop_event):
+    while not stop_event.is_set():
+        for lqueue in logger_queues:
+            try:
+                record = lqueue.get_nowait()
+                logging.getLogger(record.name).handle(record)
+            except Queue.Empty:
+                continue
+            except IOError:
+                continue
+        time.sleep(0.1)
+
+        
+def get_options():
+
+    usage = "usage: %(prog)s [options]"
+
+    parser = argparse.ArgumentParser(usage=usage)
+
+    parser.add_argument("-f", "--file", dest="conn_str",
+                        help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
+    parser.add_argument("-T", "--track", dest="track",
+                        help="Twitter track", metavar="TRACK")
+    parser.add_argument("-k", "--key", dest="consumer_key",
+                        help="Twitter consumer key", metavar="CONSUMER_KEY", required=True)
+    parser.add_argument("-s", "--secret", dest="consumer_secret",
+                        help="Twitter consumer secret", metavar="CONSUMER_SECRET", required=True)
+    parser.add_argument("-n", "--new", dest="new", action="store_true",
+                        help="new database", default=False)
+    parser.add_argument("-D", "--daemon", dest="daemon", action="store_true",
+                        help="launch daemon", default=False)
+    parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
+                        help="Token file name")
+    parser.add_argument("-d", "--duration", dest="duration",
+                        help="Duration of recording in seconds", metavar="DURATION", default= -1, type=int)
+    parser.add_argument("-N", "--nb-process", dest="process_nb",
+                        help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type=int)
+    parser.add_argument("--url", dest="url",
+                        help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url)
+    parser.add_argument("--query-user", dest="twitter_query_user", action="store_true",
+                        help="Query twitter for users", default=False)
+    parser.add_argument("--timeout", dest="timeout",
+                        help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type=int)
+    parser.add_argument("--ask-process-leftovers", dest="ask_process_leftovers", action="store_false",
+                        help="ask process leftover", default=True)
+
+
+    utils.set_logging_options(parser)
+
+    return parser.parse_args()
+
+
+def do_run(options, session_maker):
+
+    stop_args = {}
+    
+    consumer_token = (options.consumer_key, options.consumer_secret)
+    access_token = utils.get_oauth_token(consumer_key=consumer_token[0], consumer_secret=consumer_token[1], token_file_path=options.token_filename)
+    
+    
+    session = session_maker()
+    try:
+        process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
+        session.commit()
+    finally:
+        session.rollback()
+        session.close()
+    
+    if options.process_nb <= 0:
+        utils.get_logger().debug("Leftovers processed. Exiting.")
+        return None
+
+    queue = mQueue()
+    stop_event = Event()
+    
+    # workaround for bug on using urllib2 and multiprocessing
+    httpd = HTTPServer(('127.0.0.1',0), Requesthandler)
+    thread.start_new_thread(httpd.handle_request, ())
+    
+    req = urllib2.Request('http://localhost:%d' % httpd.server_port)
+    conn = None
+    try:
+        conn = urllib2.urlopen(req)
+    except:
+        utils.get_logger().debug("could not open localhost")
+        # donothing
+    finally:
+        if conn is not None:
+            conn.close()
+    
+    process_engines = []
+    logger_queues = []
+    
+    SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
+    process_engines.append(engine_process)
+    lqueue = mQueue(50)
+    logger_queues.append(lqueue)
+    pid = os.getpid()
+    sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
+    
+    tweet_processes = []
+    
+    for i in range(options.process_nb - 1):
+        SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
+        process_engines.append(engine_process)
+        lqueue = mQueue(50)
+        logger_queues.append(lqueue)
+        cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
+        tweet_processes.append(cprocess)
+
+    log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
+    log_thread.daemon = True
+
+    log_thread.start()
+
+    sprocess.start()
+    for cprocess in tweet_processes:
+        cprocess.start()
+
+    add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
+
+    if options.duration >= 0:
+        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
+
+    def interupt_handler(signum, frame):
+        utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
+        stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
+        stop_event.set()
+        
+    signal.signal(signal.SIGINT , interupt_handler)
+    signal.signal(signal.SIGHUP , interupt_handler)
+    signal.signal(signal.SIGALRM, interupt_handler)
+    signal.signal(signal.SIGTERM, interupt_handler)
+    
+
+    while not stop_event.is_set():
+        if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
+            stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
+            stop_event.set()
+            break
+        if sprocess.is_alive():
+            utils.get_logger().debug("Source process alive")
+            time.sleep(1)
+        else:
+            stop_args.update({'message': 'Source process killed'})
+            stop_event.set()
+            break
+    utils.get_logger().debug("Joining Source Process")
+    try:
+        sprocess.join(10)
+    except:
+        utils.get_logger().debug("Pb joining Source Process - terminating")
+    finally:
+        sprocess.terminate()
+        
+    for i, cprocess in enumerate(tweet_processes):
+        utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
+        try:
+            cprocess.join(3)
+        except:
+            utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
+            cprocess.terminate()
+
+    
+    utils.get_logger().debug("Close queues")
+    try:
+        queue.close()
+        for lqueue in logger_queues:
+            lqueue.close()
+    except Exception as e:
+        utils.get_logger().error("error when closing queues %s", repr(e))
+        # do nothing
+        
+    
+    if options.process_nb > 1:
+        utils.get_logger().debug("Processing leftovers")
+        session = session_maker()
+        try:
+            process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
+            session.commit()
+        finally:
+            session.rollback()
+            session.close()
+
+    for pengine in process_engines:
+        pengine.dispose()
+    
+    return stop_args
+
+
+def main(options):
+    
+    global conn_str
+    
+    conn_str = options.conn_str.strip()
+    if not re.match("^\w+://.+", conn_str):
+        conn_str = 'sqlite:///' + options.conn_str
+        
+    if conn_str.startswith("sqlite") and options.new:
+        filepath = conn_str[conn_str.find(":///") + 4:]
+        if os.path.exists(filepath):
+            i = 1
+            basename, extension = os.path.splitext(filepath)
+            new_path = '%s.%d%s' % (basename, i, extension)
+            while i < 1000000 and os.path.exists(new_path):
+                i += 1
+                new_path = '%s.%d%s' % (basename, i, extension)
+            if i >= 1000000:
+                raise Exception("Unable to find new filename for " + filepath)
+            else:
+                shutil.move(filepath, new_path)
+
+    Session, engine, metadata = get_sessionmaker(conn_str)
+    
+    if options.new:
+        check_metadata = sqlalchemy.schema.MetaData(bind=engine)
+        check_metadata.reflect()
+        if len(check_metadata.sorted_tables) > 0:
+            message = "Database %s not empty exiting" % conn_str
+            utils.get_logger().error(message)
+            sys.exit(message)
+    
+    metadata.create_all(engine)
+    session = Session()
+    try:
+        models.add_model_version(session)
+    finally:
+        session.close()
+    
+    stop_args = {}
+    try:
+        add_process_event(event_type="start", args={'options':options.__dict__, 'args': [], 'command_line': sys.argv}, session_maker=Session)
+        stop_args = do_run(options, Session)
+    except Exception as e:
+        utils.get_logger().exception("Error in main thread")        
+        outfile = StringIO.StringIO()
+        try:
+            traceback.print_exc(file=outfile)
+            stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
+        finally:
+            outfile.close()
+        raise
+    finally:    
+        add_process_event(event_type="shutdown", args=stop_args, session_maker=Session)
+
+    utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
+
+
+
+if __name__ == '__main__':
+
+    options = get_options()
+    
+    loggers = set_logging(options)
+    
+    utils.get_logger().debug("OPTIONS : " + repr(options))
+    
+    if options.daemon:
+        options.ask_process_leftovers = False
+        import daemon
+        
+        hdlr_preserve = []
+        for logger in loggers:
+            hdlr_preserve.extend([h.stream for h in logger.handlers])
+            
+        context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) 
+        with context:
+            main(options)
+    else:
+        main(options)
+