diff -r 33cf0231e253 -r 6671e9a4c9c5 script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Thu Aug 25 02:22:19 2011 +0200 +++ b/script/stream/recorder_tweetstream.py Thu Aug 25 18:23:53 2011 +0200 @@ -111,11 +111,27 @@ # Don't listen to auth error, since we can't reasonably reconnect # when we get one. +def add_process_event(type, args, session_maker): + session = session_maker() + try: + evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type) + session.add(evt) + session.commit() + finally: + session.close() + class BaseProcess(Process): - def __init__(self, parent_pid): + def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): self.parent_pid = parent_pid + self.session_maker = session_maker + self.queue = queue + self.options = options + self.logger_queue = logger_queue + self.stop_event = stop_event + self.access_token = access_token + super(BaseProcess, self).__init__() # @@ -131,23 +147,32 @@ else: # *ring* Hi mom! return True + + + def __get_process_event_args(self): + return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token} + + def run(self): + try: + add_process_event("start_worker", self.__get_process_event_args(), self.session_maker) + self.do_run() + finally: + add_process_event("stop_worker",self.__get_process_event_args(), self.session_maker) + + def do_run(self): + raise NotImplementedError() + class SourceProcess(BaseProcess): def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): - self.session_maker = session_maker - self.queue = queue self.track = options.track self.reconnects = options.reconnects self.token_filename = options.token_filename - self.stop_event = stop_event - self.options = options - self.access_token = access_token - self.logger_queue = logger_queue - super(SourceProcess, self).__init__(parent_pid) + super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) - def run(self): + def do_run(self): #import pydevd #pydevd.settrace(suspend=False) @@ -218,11 +243,13 @@ processor.process() except Exception as e: message = u"Error %s processing tweet %s" % (repr(e), tweet) - logger.error(message) + logger.exception(message) output = StringIO.StringIO() - traceback.print_exception(Exception, e, None, None, output) - error_stack = output.getvalue() - output.close() + try: + traceback.print_exc(file=output) + error_stack = output.getvalue() + finally: + output.close() session.rollback() tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack) session.add(tweet_log) @@ -233,23 +260,17 @@ class TweetProcess(BaseProcess): def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): - self.session_maker = session_maker - self.queue = queue - self.stop_event = stop_event - self.options = options - self.access_token = access_token - self.logger_queue = logger_queue - super(TweetProcess, self).__init__(parent_pid) + super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) - def run(self): + def do_run(self): self.logger = set_logging_process(self.options, self.logger_queue) session = self.session_maker() try: while not self.stop_event.is_set() and self.parent_is_alive(): try: - source_id, tweet_txt = queue.get(True, 3) + source_id, tweet_txt = self.queue.get(True, 3) self.logger.debug("Processing source id " + repr(source_id)) except Exception as e: self.logger.debug('Process tweet exception in loop : ' + repr(e)) @@ -322,20 +343,143 @@ utils.set_logging_options(parser) return parser.parse_args() + + +def do_run(options, session_maker): + + stop_args = {} + + access_token = None + if not options.username or not options.password: + access_token = utils.get_oauth_token(options.token_filename) -def add_process_event(type, args, session_maker): session = session_maker() try: - evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type) - session.add(evt) + process_leftovers(session, access_token, utils.get_logger()) session.commit() finally: + session.rollback() session.close() + + if options.process_nb <= 0: + utils.get_logger().debug("Leftovers processed. Exiting.") + return None + + queue = mQueue() + stop_event = Event() + + #workaround for bug on using urllib2 and multiprocessing + req = urllib2.Request('http://localhost') + conn = None + try: + conn = urllib2.urlopen(req) + except: + utils.get_logger().debug("could not open localhost") + #donothing + finally: + if conn is not None: + conn.close() + + process_engines = [] + logger_queues = [] + + SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) + process_engines.append(engine_process) + lqueue = mQueue(1) + logger_queues.append(lqueue) + pid = os.getpid() + sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) + + tweet_processes = [] + + for i in range(options.process_nb - 1): + SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) + process_engines.append(engine_process) + lqueue = mQueue(1) + logger_queues.append(lqueue) + cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) + tweet_processes.append(cprocess) + + def interupt_handler(signum, frame): + utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) + stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}) + stop_event.set() + + signal.signal(signal.SIGINT , interupt_handler) + signal.signal(signal.SIGHUP , interupt_handler) + signal.signal(signal.SIGALRM, interupt_handler) + signal.signal(signal.SIGTERM, interupt_handler) + + log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) + log_thread.daemon = True + + log_thread.start() + + sprocess.start() + for cprocess in tweet_processes: + cprocess.start() + + add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name,sprocess.pid), 'consumers':dict([(p.name,p.pid) for p in tweet_processes])}, session_maker) + + if options.duration >= 0: + end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) + + + while not stop_event.is_set(): + if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: + stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts}) + stop_event.set() + break + if sprocess.is_alive(): + time.sleep(1) + else: + stop_args.update({'message': 'Source process killed'}) + stop_event.set() + break + utils.get_logger().debug("Joining Source Process") + try: + sprocess.join(10) + except: + utils.get_logger().debug("Pb joining Source Process - terminating") + sprocess.terminate() + + for i, cprocess in enumerate(tweet_processes): + utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) + try: + cprocess.join(3) + except: + utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) + cprocess.terminate() + + + utils.get_logger().debug("Close queues") + try: + queue.close() + for lqueue in logger_queues: + lqueue.close() + except exception as e: + utils.get_logger().error("error when closing queues %s", repr(e)) + #do nothing + + + if options.process_nb > 1: + utils.get_logger().debug("Processing leftovers") + session = session_maker() + try: + process_leftovers(session, access_token, utils.get_logger()) + session.commit() + finally: + session.rollback() + session.close() + + for pengine in process_engines: + pengine.dispose() + + return stop_args if __name__ == '__main__': - stop_args = {} (options, args) = get_options() set_logging(options) @@ -370,135 +514,21 @@ sys.exit(message) metadata.create_all(engine) - - add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session) - - access_token = None - if not options.username or not options.password: - access_token = utils.get_oauth_token(options.token_filename) - - session = Session() - try: - process_leftovers(session, access_token, utils.get_logger()) - session.commit() - finally: - session.rollback() - session.close() - - if options.process_nb <= 0: - utils.get_logger().debug("Leftovers processed. Exiting.") - add_process_event(type="shutdown", args=None, session_maker=Session) - sys.exit() - - queue = mQueue() - stop_event = Event() - - #workaround for bug on using urllib2 and multiprocessing - req = urllib2.Request('http://localhost') - conn = None + stop_args = {} try: - conn = urllib2.urlopen(req) - except: - pass - #donothing - finally: - if conn is not None: - conn.close() - - process_engines = [] - logger_queues = [] - - SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) - process_engines.append(engine_process) - lqueue = mQueue(1) - logger_queues.append(lqueue) - pid = os.getpid() - sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) - - tweet_processes = [] - - for i in range(options.process_nb - 1): - SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) - process_engines.append(engine_process) - lqueue = mQueue(1) - logger_queues.append(lqueue) - cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) - tweet_processes.append(cprocess) + add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session) + stop_args = do_run(options, Session) + except Exception as e: + utils.get_logger().exception("Error in main thread") + outfile = StringIO.StringIO() + try: + traceback.print_exc(file=outfile) + stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()} + finally: + outfile.close() + raise + finally: + add_process_event(type="shutdown", args=stop_args, session_maker=Session) - def interupt_handler(signum, frame): - global stop_args - utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) - stop_args = {'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)} - stop_event.set() - - signal.signal(signal.SIGINT , interupt_handler) - signal.signal(signal.SIGHUP , interupt_handler) - signal.signal(signal.SIGALRM, interupt_handler) - signal.signal(signal.SIGTERM, interupt_handler) - - log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) - log_thread.daemon = True - - log_thread.start() - - sprocess.start() - for cprocess in tweet_processes: - cprocess.start() - - add_process_event("pid", {"main":os.getpid(), 'source':sprocess.pid, 'consumers':[p.pid for p in tweet_processes]}, Session) - - if options.duration >= 0: - end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) - - - while not stop_event.is_set(): - if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: - stop_event.set() - break - if sprocess.is_alive(): - time.sleep(1) - else: - stop_event.set() - break - utils.get_logger().debug("Joining Source Process") - try: - sprocess.join(10) - except: - utils.get_logger().debug("Pb joining Source Process - terminating") - sprocess.terminate() - - for i, cprocess in enumerate(tweet_processes): - utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) - try: - cprocess.join(3) - except: - utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) - cprocess.terminate() - - - utils.get_logger().debug("Close queues") - try: - queue.close() - for lqueue in logger_queues: - lqueue.close() - except exception as e: - utils.get_logger().error("error when closing queues %s", repr(e)) - #do nothing - - - if options.process_nb > 1: - utils.get_logger().debug("Processing leftovers") - session = Session() - try: - process_leftovers(session, access_token, utils.get_logger()) - session.commit() - finally: - session.rollback() - session.close() - - for pengine in process_engines: - pengine.dispose() - - add_process_event(type="shutdown", args=stop_args, session_maker=Session) utils.get_logger().debug("Done. Exiting.")