diff -r b97a72ab59a2 -r d84c4aa2a9eb script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Wed Aug 24 18:04:26 2011 +0200 +++ b/script/stream/recorder_tweetstream.py Thu Aug 25 02:20:08 2011 +0200 @@ -1,6 +1,6 @@ from getpass import getpass from iri_tweet import models, utils -from iri_tweet.models import TweetSource, TweetLog +from iri_tweet.models import TweetSource, TweetLog, ProcessEvent from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, get_logger) from optparse import OptionParser @@ -10,6 +10,7 @@ import StringIO import anyjson import datetime +import inspect import logging import os import re @@ -111,11 +112,30 @@ # when we get one. +class BaseProcess(Process): + + def __init__(self, parent_pid): + self.parent_pid = parent_pid + super(BaseProcess, self).__init__() + + # + # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids + # + def parent_is_alive(self): + try: + # try to call Parent + os.kill(self.parent_pid, 0) + except OSError: + # *beeep* oh no! The phone's disconnected! + return False + else: + # *ring* Hi mom! + return True -class SourceProcess(Process): +class SourceProcess(BaseProcess): - def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue): + def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): self.session_maker = session_maker self.queue = queue self.track = options.track @@ -125,8 +145,8 @@ self.options = options self.access_token = access_token self.logger_queue = logger_queue - super(SourceProcess, self).__init__() - + super(SourceProcess, self).__init__(parent_pid) + def run(self): #import pydevd @@ -148,9 +168,11 @@ try: for tweet in stream: - self.logger.debug("tweet " + repr(tweet)) + if not self.parent_is_alive(): + sys.exit() + self.logger.debug("SourceProcess : tweet " + repr(tweet)) source = TweetSource(original_json=tweet) - self.logger.debug("source created") + self.logger.debug("SourceProcess : source created") add_retries = 0 while add_retries < 10: try: @@ -160,18 +182,18 @@ break except OperationalError as e: session.rollback() - self.logger.debug("Operational Error %s nb %d" % (repr(e), add_retries)) + self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries)) if add_retries == 10: raise e source_id = source.id - self.logger.debug("before queue + source id " + repr(source_id)) - self.logger.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) + self.logger.debug("SourceProcess : before queue + source id " + repr(source_id)) + self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) session.commit() self.queue.put((source_id, tweet), False) except Exception as e: - self.logger.error("Error when processing tweet " + repr(e)) + self.logger.error("SourceProcess : Error when processing tweet " + repr(e)) finally: session.rollback() stream.close() @@ -208,16 +230,16 @@ -class TweetProcess(Process): +class TweetProcess(BaseProcess): - def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue): + def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): self.session_maker = session_maker self.queue = queue self.stop_event = stop_event self.options = options self.access_token = access_token self.logger_queue = logger_queue - super(TweetProcess, self).__init__() + super(TweetProcess, self).__init__(parent_pid) def run(self): @@ -225,7 +247,7 @@ self.logger = set_logging_process(self.options, self.logger_queue) session = self.session_maker() try: - while not self.stop_event.is_set(): + while not self.stop_event.is_set() and self.parent_is_alive(): try: source_id, tweet_txt = queue.get(True, 3) self.logger.debug("Processing source id " + repr(source_id)) @@ -273,7 +295,11 @@ def get_options(): - parser = OptionParser() + + usage = "usage: %prog [options]" + + parser = OptionParser(usage=usage) + parser.add_option("-f", "--file", dest="conn_str", help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db") parser.add_option("-u", "--user", dest="username", @@ -293,30 +319,35 @@ parser.add_option("-N", "--nb-process", dest="process_nb", help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') - - utils.set_logging_options(parser) return parser.parse_args() +def add_process_event(type, args, session_maker): + session = session_maker() + try: + evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type) + session.add(evt) + session.commit() + finally: + session.close() + if __name__ == '__main__': + stop_args = {} (options, args) = get_options() set_logging(options) - - if options.debug: - print "OPTIONS : " - print repr(options) + utils.get_logger().debug("OPTIONS : " + repr(options)) conn_str = options.conn_str.strip() if not re.match("^\w+://.+", conn_str): conn_str = 'sqlite:///' + options.conn_str if conn_str.startswith("sqlite") and options.new: - filepath = conn_str[conn_str.find(":///")+4:] + filepath = conn_str[conn_str.find(":///") + 4:] if os.path.exists(filepath): i = 1 basename, extension = os.path.splitext(filepath) @@ -340,6 +371,8 @@ metadata.create_all(engine) + add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session) + access_token = None if not options.username or not options.password: access_token = utils.get_oauth_token(options.token_filename) @@ -354,6 +387,7 @@ if options.process_nb <= 0: utils.get_logger().debug("Leftovers processed. Exiting.") + add_process_event(type="shutdown", args=None, session_maker=Session) sys.exit() queue = mQueue() @@ -378,7 +412,8 @@ process_engines.append(engine_process) lqueue = mQueue(1) logger_queues.append(lqueue) - sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue) + pid = os.getpid() + sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) tweet_processes = [] @@ -387,11 +422,13 @@ process_engines.append(engine_process) lqueue = mQueue(1) logger_queues.append(lqueue) - cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue) + cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) tweet_processes.append(cprocess) def interupt_handler(signum, frame): - utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(frame)) + global stop_args + utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) + stop_args = {'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)} stop_event.set() signal.signal(signal.SIGINT , interupt_handler) @@ -399,14 +436,16 @@ signal.signal(signal.SIGALRM, interupt_handler) signal.signal(signal.SIGTERM, interupt_handler) - log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues,stop_event,)) + log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) log_thread.daemon = True + log_thread.start() + sprocess.start() for cprocess in tweet_processes: cprocess.start() - log_thread.start() + add_process_event("pid", {"main":os.getpid(), 'source':sprocess.pid, 'consumers':[p.pid for p in tweet_processes]}, Session) if options.duration >= 0: end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) @@ -460,5 +499,6 @@ for pengine in process_engines: pengine.dispose() + add_process_event(type="shutdown", args=stop_args, session_maker=Session) utils.get_logger().debug("Done. Exiting.")