script/stream/recorder_tweetstream.py
changeset 893 10a19dd4e1c9
parent 877 41ce1c341abe
parent 891 8628c590f608
child 894 cba4554e9c03
--- a/script/stream/recorder_tweetstream.py	Tue May 07 18:28:26 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,528 +0,0 @@
-from getpass import getpass
-from iri_tweet import models, utils
-from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
-from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
-    get_logger)
-from optparse import OptionParser
-from sqlalchemy.exc import OperationalError
-from sqlalchemy.orm import scoped_session
-import Queue
-import StringIO
-import anyjson
-import datetime
-import inspect
-import logging
-import os
-import re
-import shutil
-import signal
-import socket
-import sqlalchemy.schema
-import sys
-import threading
-import time
-import traceback
-import tweepy.auth
-import tweetstream
-import urllib2
-socket._fileobject.default_bufsize = 0
-
-
-
-#columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
-columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
-#columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
-columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
-#just put it in a sqlite3 tqble
-
-
-def set_logging(options):
-    loggers = []
-    
-    loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
-    loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing')))
-    if options.debug >= 2:
-        loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine')))
-    #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
-    #utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
-    #utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
-    return loggers
-
-def set_logging_process(options, queue):
-    qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
-    qlogger.propagate = 0
-    return qlogger
-
-def get_auth(options, access_token):
-    if options.username and options.password:
-        auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
-    else:
-        consumer_key = models.CONSUMER_KEY
-        consumer_secret = models.CONSUMER_SECRET
-        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
-        auth.set_access_token(*access_token)
-    return auth
-
-
-def add_process_event(type, args, session_maker):
-    session = session_maker()
-    try:
-        evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
-        session.add(evt)
-        session.commit()
-    finally:
-        session.close()
-
-
-class BaseProcess(Process):
-
-    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
-        self.parent_pid = parent_pid
-        self.session_maker = session_maker
-        self.queue = queue
-        self.options = options
-        self.logger_queue = logger_queue
-        self.stop_event = stop_event
-        self.access_token = access_token
-
-        super(BaseProcess, self).__init__()
-
-    #
-    # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
-    #
-    def parent_is_alive(self):
-        try:
-            # try to call Parent
-            os.kill(self.parent_pid, 0)
-        except OSError:
-            # *beeep* oh no! The phone's disconnected!
-            return False
-        else:
-            # *ring* Hi mom!
-            return True
-    
-
-    def __get_process_event_args(self):
-        return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
-
-    def run(self):
-        try:
-            add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
-            self.do_run()
-        finally:
-            add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
-        
-    def do_run(self):
-        raise NotImplementedError()
-
-
-
-class SourceProcess(BaseProcess):
-    
-    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
-        self.track = options.track
-        self.token_filename = options.token_filename
-        self.catchup = options.catchup
-        self.timeout = options.timeout
-        super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
-
-    def do_run(self):
-        
-        #import pydevd
-        #pydevd.settrace(suspend=True)
-
-        self.logger = set_logging_process(self.options, self.logger_queue)
-        self.auth = get_auth(self.options, self.access_token) 
-        
-        self.logger.debug("SourceProcess : run ")
-        track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
-        self.logger.debug("SourceProcess : track list " + track_list)
-        
-        track_list = [k.strip() for k in track_list.split(',')]
-
-        self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
-        stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout)
-        self.logger.debug("SourceProcess : after connecting to stream")
-        stream.muststop = lambda: self.stop_event.is_set()
-        
-        session = self.session_maker()
-        
-        try:
-            for tweet in stream:
-                if not self.parent_is_alive():
-                    sys.exit()
-                self.logger.debug("SourceProcess : tweet " + repr(tweet))
-                source = TweetSource(original_json=tweet)
-                self.logger.debug("SourceProcess : source created")
-                add_retries = 0
-                while add_retries < 10:
-                    try:
-                        add_retries += 1
-                        session.add(source)
-                        session.flush()
-                        break
-                    except OperationalError as e:
-                        session.rollback()
-                        self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
-                        if add_retries == 10:
-                            raise e
-                     
-                source_id = source.id
-                self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
-                self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
-                session.commit()
-                self.queue.put((source_id, tweet), False)
-
-        except Exception as e:
-            self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
-        finally:
-            session.rollback()
-            stream.close()
-            session.close()
-            self.queue.close()
-            self.stop_event.set()
-
-
-def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger):
-    try:
-        tweet_obj = anyjson.deserialize(tweet)
-        if 'text' not in tweet_obj:
-            tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
-            session.add(tweet_log)
-            return
-        screen_name = ""
-        if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
-            screen_name = tweet_obj['user']['screen_name']
-        logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
-        logger.debug(u"Process_tweet :" + repr(tweet))
-        processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user)
-        processor.process()
-    except Exception as e:
-        message = u"Error %s processing tweet %s" % (repr(e), tweet)
-        logger.exception(message)
-        output = StringIO.StringIO()
-        try:
-            traceback.print_exc(file=output)
-            error_stack = output.getvalue()
-        finally:
-            output.close()
-        session.rollback()
-        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
-        session.add(tweet_log)
-        session.commit()
-
-    
-        
-class TweetProcess(BaseProcess):
-    
-    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
-        super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
-        self.twitter_query_user = options.twitter_query_user
-
-
-    def do_run(self):
-        
-        self.logger = set_logging_process(self.options, self.logger_queue)
-        session = self.session_maker()
-        try:
-            while not self.stop_event.is_set() and self.parent_is_alive():
-                try:
-                    source_id, tweet_txt = self.queue.get(True, 3)
-                    self.logger.debug("Processing source id " + repr(source_id))
-                except Exception as e:
-                    self.logger.debug('Process tweet exception in loop : ' + repr(e))
-                    continue
-                process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger)
-                session.commit()
-        finally:
-            session.rollback()
-            self.stop_event.set()
-            session.close()
-
-
-def get_sessionmaker(conn_str):
-    engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
-    Session = scoped_session(Session)
-    return Session, engine, metadata
-
-            
-def process_leftovers(session, access_token, twitter_query_user, logger):
-    
-    sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
-    
-    for src in sources:
-        tweet_txt = src.original_json
-        process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger)
-        session.commit()
-
-        
-    
-    #get tweet source that do not match any message
-    #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
-def process_log(logger_queues, stop_event):
-    while not stop_event.is_set():
-        for lqueue in logger_queues:
-            try:
-                record = lqueue.get_nowait()
-                logging.getLogger(record.name).handle(record)
-            except Queue.Empty:
-                continue
-            except IOError:
-                continue
-        time.sleep(0.1)
-
-        
-def get_options():
-
-    usage = "usage: %prog [options]"
-
-    parser = OptionParser(usage=usage)
-
-    parser.add_option("-f", "--file", dest="conn_str",
-                      help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
-    parser.add_option("-u", "--user", dest="username",
-                      help="Twitter user", metavar="USER", default=None)
-    parser.add_option("-w", "--password", dest="password",
-                      help="Twitter password", metavar="PASSWORD", default=None)
-    parser.add_option("-T", "--track", dest="track",
-                      help="Twitter track", metavar="TRACK")
-    parser.add_option("-n", "--new", dest="new", action="store_true",
-                      help="new database", default=False)
-    parser.add_option("-D", "--daemon", dest="daemon", action="store_true",
-                      help="launch daemon", default=False)
-    parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
-                      help="Token file name")
-    parser.add_option("-d", "--duration", dest="duration",
-                      help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
-    parser.add_option("-N", "--nb-process", dest="process_nb",
-                      help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
-    parser.add_option("--url", dest="url",
-                      help="The twitter url to connect to.", metavar="URL", default=tweetstream.FilterStream.url)
-    parser.add_option("--query-user", dest="twitter_query_user", action="store_true",
-                      help="Query twitter for users", default=False, metavar="QUERY_USER")
-    parser.add_option("--catchup", dest="catchup",
-                      help="catchup count for tweets", default=None, metavar="CATCHUP", type='int')
-    parser.add_option("--timeout", dest="timeout",
-                      help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int')
-    
-
-
-
-    utils.set_logging_options(parser)
-
-    return parser.parse_args()
-
-
-def do_run(options, session_maker):
-
-    stop_args = {}
-
-    access_token = None
-    if not options.username or not options.password:
-        access_token = utils.get_oauth_token(options.token_filename)
-    
-    session = session_maker()
-    try:
-        process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
-        session.commit()
-    finally:
-        session.rollback()
-        session.close()
-    
-    if options.process_nb <= 0:
-        utils.get_logger().debug("Leftovers processed. Exiting.")
-        return None
-
-    queue = mQueue()
-    stop_event = Event()
-    
-    #workaround for bug on using urllib2 and multiprocessing
-    req = urllib2.Request('http://localhost')
-    conn = None
-    try:
-        conn = urllib2.urlopen(req)
-    except:
-        utils.get_logger().debug("could not open localhost")
-        #donothing
-    finally:
-        if conn is not None:
-            conn.close()
-    
-    process_engines = []
-    logger_queues = []
-    
-    SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
-    process_engines.append(engine_process)
-    lqueue = mQueue(1)
-    logger_queues.append(lqueue)
-    pid = os.getpid()
-    sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
-    
-    tweet_processes = []
-    
-    for i in range(options.process_nb - 1):
-        SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
-        process_engines.append(engine_process)
-        lqueue = mQueue(1)
-        logger_queues.append(lqueue)
-        cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
-        tweet_processes.append(cprocess)
-
-    def interupt_handler(signum, frame):
-        utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
-        stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
-        stop_event.set()
-        
-    signal.signal(signal.SIGINT , interupt_handler)
-    signal.signal(signal.SIGHUP , interupt_handler)
-    signal.signal(signal.SIGALRM, interupt_handler)
-    signal.signal(signal.SIGTERM, interupt_handler)
-
-    log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
-    log_thread.daemon = True
-
-    log_thread.start()
-
-    sprocess.start()
-    for cprocess in tweet_processes:
-        cprocess.start()
-
-    add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
-
-    if options.duration >= 0:
-        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
-    
-
-    while not stop_event.is_set():
-        if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
-            stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
-            stop_event.set()
-            break
-        if sprocess.is_alive():            
-            time.sleep(1)
-        else:
-            stop_args.update({'message': 'Source process killed'})
-            stop_event.set()
-            break
-    utils.get_logger().debug("Joining Source Process")
-    try:
-        sprocess.join(10)
-    except:
-        utils.get_logger().debug("Pb joining Source Process - terminating")
-        sprocess.terminate()
-        
-    for i, cprocess in enumerate(tweet_processes):
-        utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
-        try:
-            cprocess.join(3)
-        except:
-            utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
-            cprocess.terminate()
-
-    
-    utils.get_logger().debug("Close queues")
-    try:
-        queue.close()
-        for lqueue in logger_queues:
-            lqueue.close()
-    except exception as e:
-        utils.get_logger().error("error when closing queues %s", repr(e))
-        #do nothing
-        
-    
-    if options.process_nb > 1:
-        utils.get_logger().debug("Processing leftovers")
-        session = session_maker()
-        try:
-            process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
-            session.commit()
-        finally:
-            session.rollback()
-            session.close()
-
-    for pengine in process_engines:
-        pengine.dispose()
-
-    return stop_args
-
-
-def main(options, args):
-    
-    global conn_str
-    
-    conn_str = options.conn_str.strip()
-    if not re.match("^\w+://.+", conn_str):
-        conn_str = 'sqlite:///' + options.conn_str
-        
-    if conn_str.startswith("sqlite") and options.new:
-        filepath = conn_str[conn_str.find(":///") + 4:]
-        if os.path.exists(filepath):
-            i = 1
-            basename, extension = os.path.splitext(filepath)
-            new_path = '%s.%d%s' % (basename, i, extension)
-            while i < 1000000 and os.path.exists(new_path):
-                i += 1
-                new_path = '%s.%d%s' % (basename, i, extension)
-            if i >= 1000000:
-                raise Exception("Unable to find new filename for " + filepath)
-            else:
-                shutil.move(filepath, new_path)
-
-    Session, engine, metadata = get_sessionmaker(conn_str)
-    
-    if options.new:
-        check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
-        if len(check_metadata.sorted_tables) > 0:
-            message = "Database %s not empty exiting" % conn_str
-            utils.get_logger().error(message)
-            sys.exit(message)
-    
-    metadata.create_all(engine)
-    session = Session()
-    try:
-        models.add_model_version(session)
-    finally:
-        session.close()
-    
-    stop_args = {}
-    try:
-        add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
-        stop_args = do_run(options, Session)
-    except Exception as e:
-        utils.get_logger().exception("Error in main thread")        
-        outfile = StringIO.StringIO()
-        try:
-            traceback.print_exc(file=outfile)
-            stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
-        finally:
-            outfile.close()
-        raise
-    finally:    
-        add_process_event(type="shutdown", args=stop_args, session_maker=Session)
-
-    utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
-
-
-
-if __name__ == '__main__':
-
-    (options, args) = get_options()
-    
-    loggers = set_logging(options)
-    
-    utils.get_logger().debug("OPTIONS : " + repr(options))
-    
-    if options.daemon:
-        import daemon
-        import lockfile
-        
-        hdlr_preserve = []
-        for logger in loggers:
-            hdlr_preserve.extend([h.stream for h in logger.handlers])
-            
-        context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) 
-        with context:
-            main(options, args)
-    else:
-        main(options, args)
-