diff -r c774bdf7d3dd -r 9c57883dbb9d script/stream/recorder_stream.py --- a/script/stream/recorder_stream.py Tue May 07 19:33:35 2013 +0200 +++ b/script/stream/recorder_stream.py Wed May 08 01:24:19 2013 +0200 @@ -197,8 +197,6 @@ finally: session.rollback() session.close() - self.logger_queue.close() - self.queue.close() self.stream.close() self.stream = None if not self.stop_event.is_set(): @@ -225,7 +223,9 @@ self.stream.close() elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive: self.stop_event.set() - + + self.queue.cancel_join_thread() + self.logger_queue.cancel_join_thread() self.logger.info("SourceProcess : join") source_stream_iter_thread.join(30) @@ -330,8 +330,6 @@ session.commit() - # get tweet source that do not match any message - # select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull; def process_log(logger_queues, stop_event): while not stop_event.is_set(): for lqueue in logger_queues: @@ -484,6 +482,7 @@ sprocess.join(10) except: utils.get_logger().debug("Pb joining Source Process - terminating") + finally: sprocess.terminate() for i, cprocess in enumerate(tweet_processes): @@ -495,7 +494,15 @@ cprocess.terminate() - utils.get_logger().debug("Close queues") + utils.get_logger().debug("Close queues") + try: + queue.close() + for lqueue in logger_queues: + lqueue.close() + except Exception as e: + utils.get_logger().error("error when closing queues %s", repr(e)) + # do nothing + if options.process_nb > 1: utils.get_logger().debug("Processing leftovers") @@ -510,14 +517,6 @@ for pengine in process_engines: pengine.dispose() - try: - queue.close() - for lqueue in logger_queues: - lqueue.close() - except Exception as e: - utils.get_logger().error("error when closing queues %s", repr(e)) - # do nothing - return stop_args