diff -r b46cfa1d188b -r 8ae3d91ea4ae script/stream/recorder_stream.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/stream/recorder_stream.py Tue Dec 18 12:26:05 2012 +0100 @@ -0,0 +1,570 @@ +from getpass import getpass +from iri_tweet import models, utils +from iri_tweet.models import TweetSource, TweetLog, ProcessEvent +from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, + get_logger) +from optparse import OptionParser +from sqlalchemy.exc import OperationalError +from sqlalchemy.orm import scoped_session +import Queue +import StringIO +import anyjson +import datetime +import inspect +import iri_tweet.stream +import logging +import os +import re +import requests.auth +import shutil +import signal +import socket +import sqlalchemy.schema +import sys +import threading +import time +import traceback +import urllib2 +socket._fileobject.default_bufsize = 0 + + + +# columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user'] +columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source'] +# columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following'] +columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count'] +# just put it in a sqlite3 tqble + +DEFAULT_TIMEOUT = 5 + +def set_logging(options): + loggers = [] + + loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet'))) + loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing'))) + if options.debug >= 2: + loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine'))) + # utils.set_logging(options, logging.getLogger('sqlalchemy.dialects')) + # utils.set_logging(options, logging.getLogger('sqlalchemy.pool')) + # utils.set_logging(options, logging.getLogger('sqlalchemy.orm')) + return loggers + +def set_logging_process(options, queue): + qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue) + qlogger.propagate = 0 + return qlogger + +def get_auth(options, access_token): + if options.username and options.password: + auth = requests.auth.BasicAuthHandler(options.username, options.password) + else: + consumer_key = models.CONSUMER_KEY + consumer_secret = models.CONSUMER_SECRET + auth = requests.auth.OAuth1(access_token[0], access_token[1], consumer_key, consumer_secret, signature_type='auth_header') + return auth + + +def add_process_event(type, args, session_maker): + session = session_maker() + try: + evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type) + session.add(evt) + session.commit() + finally: + session.close() + + +class BaseProcess(Process): + + def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): + self.parent_pid = parent_pid + self.session_maker = session_maker + self.queue = queue + self.options = options + self.logger_queue = logger_queue + self.stop_event = stop_event + self.access_token = access_token + + super(BaseProcess, self).__init__() + + # + # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids + # + def parent_is_alive(self): + try: + # try to call Parent + os.kill(self.parent_pid, 0) + except OSError: + # *beeep* oh no! The phone's disconnected! + return False + else: + # *ring* Hi mom! + return True + + + def __get_process_event_args(self): + return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token} + + def run(self): + try: + add_process_event("start_worker", self.__get_process_event_args(), self.session_maker) + self.do_run() + finally: + add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker) + + def do_run(self): + raise NotImplementedError() + + + +class SourceProcess(BaseProcess): + + def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): + self.track = options.track + self.token_filename = options.token_filename + self.catchup = options.catchup + self.timeout = options.timeout + self.stream = None + super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) + + def __source_stream_iter(self): + + self.logger = set_logging_process(self.options, self.logger_queue) + self.logger.debug("SourceProcess : run ") + + self.auth = get_auth(self.options, self.access_token) + self.logger.debug("SourceProcess : auth set ") + + track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() + self.logger.debug("SourceProcess : track list " + track_list) + + track_list = [k.strip() for k in track_list.split(',')] + + self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) + self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger) + self.logger.debug("SourceProcess : after connecting to stream") + self.stream.muststop = lambda: self.stop_event.is_set() + + stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger) + + session = self.session_maker() + + try: + for tweet in stream_wrapper: + if not self.parent_is_alive(): + self.stop_event.set() + stop_thread.join(5) + sys.exit() + self.logger.debug("SourceProcess : tweet " + repr(tweet)) + source = TweetSource(original_json=tweet) + self.logger.debug("SourceProcess : source created") + add_retries = 0 + while add_retries < 10: + try: + add_retries += 1 + session.add(source) + session.flush() + break + except OperationalError as e: + session.rollback() + self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries)) + if add_retries == 10: + raise + + source_id = source.id + self.logger.debug("SourceProcess : before queue + source id " + repr(source_id)) + self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime))) + session.commit() + self.queue.put((source_id, tweet), False) + + except Exception as e: + self.logger.error("SourceProcess : Error when processing tweet " + repr(e)) + raise + finally: + session.rollback() + session.close() + self.logger_queue.close() + self.queue.close() + self.stream.close() + self.stream = None + if not self.stop_event.is_set(): + self.stop_event.set() + + + def do_run(self): + + # import pydevd + # pydevd.settrace(suspend=False) + + source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread") + + source_stream_iter_thread.start() + + while not self.stop_event.is_set(): + self.logger.debug("SourceProcess : In while after start") + self.stop_event.wait(DEFAULT_TIMEOUT) + if self.stop_event.is_set() and self.stream: + self.stream.close() + elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive: + self.stop_event.set() + + self.logger.info("SourceProcess : join") + source_stream_iter_thread.join(30) + + +def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger): + try: + if not tweet.strip(): + return + tweet_obj = anyjson.deserialize(tweet) + if 'text' not in tweet_obj: + tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) + session.add(tweet_log) + return + screen_name = "" + if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: + screen_name = tweet_obj['user']['screen_name'] + logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) + logger.debug(u"Process_tweet :" + repr(tweet)) + processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user) + processor.process() + except ValueError as e: + message = u"Value Error %s processing tweet %s" % (repr(e), tweet) + output = StringIO.StringIO() + try: + traceback.print_exc(file=output) + error_stack = output.getvalue() + finally: + output.close() + tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack) + session.add(tweet_log) + session.commit() + except Exception as e: + message = u"Error %s processing tweet %s" % (repr(e), tweet) + logger.exception(message) + output = StringIO.StringIO() + try: + traceback.print_exc(file=output) + error_stack = output.getvalue() + finally: + output.close() + session.rollback() + tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack) + session.add(tweet_log) + session.commit() + + + +class TweetProcess(BaseProcess): + + def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): + super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) + self.twitter_query_user = options.twitter_query_user + + + def do_run(self): + + self.logger = set_logging_process(self.options, self.logger_queue) + session = self.session_maker() + try: + while not self.stop_event.is_set() and self.parent_is_alive(): + try: + source_id, tweet_txt = self.queue.get(True, 3) + self.logger.debug("Processing source id " + repr(source_id)) + except Exception as e: + self.logger.debug('Process tweet exception in loop : ' + repr(e)) + continue + process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger) + session.commit() + finally: + session.rollback() + session.close() + + +def get_sessionmaker(conn_str): + engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False) + Session = scoped_session(Session) + return Session, engine, metadata + + +def process_leftovers(session, access_token, twitter_query_user, logger): + + sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) + + for src in sources: + tweet_txt = src.original_json + process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger) + session.commit() + + + + # get tweet source that do not match any message + # select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull; +def process_log(logger_queues, stop_event): + while not stop_event.is_set(): + for lqueue in logger_queues: + try: + record = lqueue.get_nowait() + logging.getLogger(record.name).handle(record) + except Queue.Empty: + continue + except IOError: + continue + time.sleep(0.1) + + +def get_options(): + + usage = "usage: %prog [options]" + + parser = OptionParser(usage=usage) + + parser.add_option("-f", "--file", dest="conn_str", + help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db") + parser.add_option("-u", "--user", dest="username", + help="Twitter user", metavar="USER", default=None) + parser.add_option("-w", "--password", dest="password", + help="Twitter password", metavar="PASSWORD", default=None) + parser.add_option("-T", "--track", dest="track", + help="Twitter track", metavar="TRACK") + parser.add_option("-n", "--new", dest="new", action="store_true", + help="new database", default=False) + parser.add_option("-D", "--daemon", dest="daemon", action="store_true", + help="launch daemon", default=False) + parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", + help="Token file name") + parser.add_option("-d", "--duration", dest="duration", + help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') + parser.add_option("-N", "--nb-process", dest="process_nb", + help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') + parser.add_option("--url", dest="url", + help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url) + parser.add_option("--query-user", dest="twitter_query_user", action="store_true", + help="Query twitter for users", default=False, metavar="QUERY_USER") + parser.add_option("--catchup", dest="catchup", + help="catchup count for tweets", default=None, metavar="CATCHUP", type='int') + parser.add_option("--timeout", dest="timeout", + help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int') + + + + + utils.set_logging_options(parser) + + return parser.parse_args() + + +def do_run(options, session_maker): + + stop_args = {} + + access_token = None + if not options.username or not options.password: + access_token = utils.get_oauth_token(options.token_filename) + + session = session_maker() + try: + process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger()) + session.commit() + finally: + session.rollback() + session.close() + + if options.process_nb <= 0: + utils.get_logger().debug("Leftovers processed. Exiting.") + return None + + queue = mQueue() + stop_event = Event() + + # workaround for bug on using urllib2 and multiprocessing + req = urllib2.Request('http://localhost') + conn = None + try: + conn = urllib2.urlopen(req) + except: + utils.get_logger().debug("could not open localhost") + # donothing + finally: + if conn is not None: + conn.close() + + process_engines = [] + logger_queues = [] + + SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) + process_engines.append(engine_process) + lqueue = mQueue(50) + logger_queues.append(lqueue) + pid = os.getpid() + sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) + + tweet_processes = [] + + for i in range(options.process_nb - 1): + SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) + process_engines.append(engine_process) + lqueue = mQueue(50) + logger_queues.append(lqueue) + cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) + tweet_processes.append(cprocess) + + log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) + log_thread.daemon = True + + log_thread.start() + + sprocess.start() + for cprocess in tweet_processes: + cprocess.start() + + add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker) + + if options.duration >= 0: + end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) + + def interupt_handler(signum, frame): + utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) + stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}) + stop_event.set() + + signal.signal(signal.SIGINT , interupt_handler) + signal.signal(signal.SIGHUP , interupt_handler) + signal.signal(signal.SIGALRM, interupt_handler) + signal.signal(signal.SIGTERM, interupt_handler) + + + while not stop_event.is_set(): + if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: + stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts}) + stop_event.set() + break + if sprocess.is_alive(): + utils.get_logger().debug("Source process alive") + time.sleep(1) + else: + stop_args.update({'message': 'Source process killed'}) + stop_event.set() + break + utils.get_logger().debug("Joining Source Process") + try: + sprocess.join(10) + except: + utils.get_logger().debug("Pb joining Source Process - terminating") + sprocess.terminate() + + for i, cprocess in enumerate(tweet_processes): + utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) + try: + cprocess.join(3) + except: + utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) + cprocess.terminate() + + + utils.get_logger().debug("Close queues") + try: + queue.close() + for lqueue in logger_queues: + lqueue.close() + except exception as e: + utils.get_logger().error("error when closing queues %s", repr(e)) + # do nothing + + + if options.process_nb > 1: + utils.get_logger().debug("Processing leftovers") + session = session_maker() + try: + process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger()) + session.commit() + finally: + session.rollback() + session.close() + + for pengine in process_engines: + pengine.dispose() + + return stop_args + + +def main(options, args): + + global conn_str + + conn_str = options.conn_str.strip() + if not re.match("^\w+://.+", conn_str): + conn_str = 'sqlite:///' + options.conn_str + + if conn_str.startswith("sqlite") and options.new: + filepath = conn_str[conn_str.find(":///") + 4:] + if os.path.exists(filepath): + i = 1 + basename, extension = os.path.splitext(filepath) + new_path = '%s.%d%s' % (basename, i, extension) + while i < 1000000 and os.path.exists(new_path): + i += 1 + new_path = '%s.%d%s' % (basename, i, extension) + if i >= 1000000: + raise Exception("Unable to find new filename for " + filepath) + else: + shutil.move(filepath, new_path) + + Session, engine, metadata = get_sessionmaker(conn_str) + + if options.new: + check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True) + if len(check_metadata.sorted_tables) > 0: + message = "Database %s not empty exiting" % conn_str + utils.get_logger().error(message) + sys.exit(message) + + metadata.create_all(engine) + session = Session() + try: + models.add_model_version(session) + finally: + session.close() + + stop_args = {} + try: + add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session) + stop_args = do_run(options, Session) + except Exception as e: + utils.get_logger().exception("Error in main thread") + outfile = StringIO.StringIO() + try: + traceback.print_exc(file=outfile) + stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()} + finally: + outfile.close() + raise + finally: + add_process_event(type="shutdown", args=stop_args, session_maker=Session) + + utils.get_logger().debug("Done. Exiting. " + repr(stop_args)) + + + +if __name__ == '__main__': + + (options, args) = get_options() + + loggers = set_logging(options) + + utils.get_logger().debug("OPTIONS : " + repr(options)) + + if options.daemon: + import daemon + import lockfile + + hdlr_preserve = [] + for logger in loggers: + hdlr_preserve.extend([h.stream for h in logger.handlers]) + + context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) + with context: + main(options, args) + else: + main(options, args) +