--- a/script/lib/iri_tweet/iri_tweet/utils.py Tue May 07 19:33:35 2013 +0200
+++ b/script/lib/iri_tweet/iri_tweet/utils.py Wed May 08 01:24:19 2013 +0200
@@ -266,19 +266,17 @@
def get_logger():
global logger_name
- return logging.getLogger(logger_name) #@UndefinedVariable
+ return logging.getLogger(logger_name)
-# Next two import lines for this demo only
-
-class QueueHandler(logging.Handler): #@UndefinedVariable
+class QueueHandler(logging.Handler):
"""
This is a logging handler which sends events to a multiprocessing queue.
"""
def __init__(self, queue, ignore_full):
"""
- Initialise an instance, using the passed queue.
+ Initialize an instance, using the passed queue.
"""
logging.Handler.__init__(self) #@UndefinedVariable
self.queue = queue
@@ -293,7 +291,7 @@
try:
ei = record.exc_info
if ei:
- dummy = self.format(record) # just to get traceback text into record.exc_text
+ _ = self.format(record) # just to get traceback text into record.exc_text
record.exc_info = None # not needed any more
if not self.ignore_full or (not self.queue.full()):
self.queue.put_nowait(record)
--- a/script/stream/recorder_stream.py Tue May 07 19:33:35 2013 +0200
+++ b/script/stream/recorder_stream.py Wed May 08 01:24:19 2013 +0200
@@ -197,8 +197,6 @@
finally:
session.rollback()
session.close()
- self.logger_queue.close()
- self.queue.close()
self.stream.close()
self.stream = None
if not self.stop_event.is_set():
@@ -225,7 +223,9 @@
self.stream.close()
elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
self.stop_event.set()
-
+
+ self.queue.cancel_join_thread()
+ self.logger_queue.cancel_join_thread()
self.logger.info("SourceProcess : join")
source_stream_iter_thread.join(30)
@@ -330,8 +330,6 @@
session.commit()
- # get tweet source that do not match any message
- # select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
def process_log(logger_queues, stop_event):
while not stop_event.is_set():
for lqueue in logger_queues:
@@ -484,6 +482,7 @@
sprocess.join(10)
except:
utils.get_logger().debug("Pb joining Source Process - terminating")
+ finally:
sprocess.terminate()
for i, cprocess in enumerate(tweet_processes):
@@ -495,7 +494,15 @@
cprocess.terminate()
- utils.get_logger().debug("Close queues")
+ utils.get_logger().debug("Close queues")
+ try:
+ queue.close()
+ for lqueue in logger_queues:
+ lqueue.close()
+ except Exception as e:
+ utils.get_logger().error("error when closing queues %s", repr(e))
+ # do nothing
+
if options.process_nb > 1:
utils.get_logger().debug("Processing leftovers")
@@ -510,14 +517,6 @@
for pengine in process_engines:
pengine.dispose()
- try:
- queue.close()
- for lqueue in logger_queues:
- lqueue.close()
- except Exception as e:
- utils.get_logger().error("error when closing queues %s", repr(e))
- # do nothing
-
return stop_args