--- a/script/stream/recorder_stream.py Tue May 07 19:33:35 2013 +0200
+++ b/script/stream/recorder_stream.py Wed May 08 01:24:19 2013 +0200
@@ -197,8 +197,6 @@
finally:
session.rollback()
session.close()
- self.logger_queue.close()
- self.queue.close()
self.stream.close()
self.stream = None
if not self.stop_event.is_set():
@@ -225,7 +223,9 @@
self.stream.close()
elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
self.stop_event.set()
-
+
+ self.queue.cancel_join_thread()
+ self.logger_queue.cancel_join_thread()
self.logger.info("SourceProcess : join")
source_stream_iter_thread.join(30)
@@ -330,8 +330,6 @@
session.commit()
- # get tweet source that do not match any message
- # select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
def process_log(logger_queues, stop_event):
while not stop_event.is_set():
for lqueue in logger_queues:
@@ -484,6 +482,7 @@
sprocess.join(10)
except:
utils.get_logger().debug("Pb joining Source Process - terminating")
+ finally:
sprocess.terminate()
for i, cprocess in enumerate(tweet_processes):
@@ -495,7 +494,15 @@
cprocess.terminate()
- utils.get_logger().debug("Close queues")
+ utils.get_logger().debug("Close queues")
+ try:
+ queue.close()
+ for lqueue in logger_queues:
+ lqueue.close()
+ except Exception as e:
+ utils.get_logger().error("error when closing queues %s", repr(e))
+ # do nothing
+
if options.process_nb > 1:
utils.get_logger().debug("Processing leftovers")
@@ -510,14 +517,6 @@
for pengine in process_engines:
pengine.dispose()
- try:
- queue.close()
- for lqueue in logger_queues:
- lqueue.close()
- except Exception as e:
- utils.get_logger().error("error when closing queues %s", repr(e))
- # do nothing
-
return stop_args