Correct stopping process
authorYves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Wed, 08 May 2013 01:24:19 +0200
changeset 890 9c57883dbb9d
parent 889 c774bdf7d3dd
child 891 8628c590f608
Correct stopping process
script/lib/iri_tweet/iri_tweet/utils.py
script/stream/recorder_stream.py
--- a/script/lib/iri_tweet/iri_tweet/utils.py	Tue May 07 19:33:35 2013 +0200
+++ b/script/lib/iri_tweet/iri_tweet/utils.py	Wed May 08 01:24:19 2013 +0200
@@ -266,19 +266,17 @@
 
 def get_logger():
     global logger_name
-    return logging.getLogger(logger_name) #@UndefinedVariable
+    return logging.getLogger(logger_name)
 
 
-# Next two import lines for this demo only
-
-class QueueHandler(logging.Handler): #@UndefinedVariable
+class QueueHandler(logging.Handler):
     """
     This is a logging handler which sends events to a multiprocessing queue.    
     """
 
     def __init__(self, queue, ignore_full):
         """
-        Initialise an instance, using the passed queue.
+        Initialize an instance, using the passed queue.
         """
         logging.Handler.__init__(self) #@UndefinedVariable
         self.queue = queue
@@ -293,7 +291,7 @@
         try:
             ei = record.exc_info
             if ei:
-                dummy = self.format(record) # just to get traceback text into record.exc_text
+                _ = self.format(record) # just to get traceback text into record.exc_text
                 record.exc_info = None  # not needed any more
             if not self.ignore_full or (not self.queue.full()):
                 self.queue.put_nowait(record)
--- a/script/stream/recorder_stream.py	Tue May 07 19:33:35 2013 +0200
+++ b/script/stream/recorder_stream.py	Wed May 08 01:24:19 2013 +0200
@@ -197,8 +197,6 @@
         finally:
             session.rollback()
             session.close()
-            self.logger_queue.close()
-            self.queue.close()
             self.stream.close()
             self.stream = None
             if not self.stop_event.is_set():
@@ -225,7 +223,9 @@
             self.stream.close()
         elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
             self.stop_event.set()
-    
+
+        self.queue.cancel_join_thread()
+        self.logger_queue.cancel_join_thread()
         self.logger.info("SourceProcess : join")
         source_stream_iter_thread.join(30)
 
@@ -330,8 +330,6 @@
         session.commit()
         
     
-    # get tweet source that do not match any message
-    # select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
 def process_log(logger_queues, stop_event):
     while not stop_event.is_set():
         for lqueue in logger_queues:
@@ -484,6 +482,7 @@
         sprocess.join(10)
     except:
         utils.get_logger().debug("Pb joining Source Process - terminating")
+    finally:
         sprocess.terminate()
         
     for i, cprocess in enumerate(tweet_processes):
@@ -495,7 +494,15 @@
             cprocess.terminate()
 
     
-    utils.get_logger().debug("Close queues")        
+    utils.get_logger().debug("Close queues")
+    try:
+        queue.close()
+        for lqueue in logger_queues:
+            lqueue.close()
+    except Exception as e:
+        utils.get_logger().error("error when closing queues %s", repr(e))
+        # do nothing
+        
     
     if options.process_nb > 1:
         utils.get_logger().debug("Processing leftovers")
@@ -510,14 +517,6 @@
     for pengine in process_engines:
         pengine.dispose()
     
-    try:
-        queue.close()
-        for lqueue in logger_queues:
-            lqueue.close()
-    except Exception as e:
-        utils.get_logger().error("error when closing queues %s", repr(e))
-        # do nothing
-
     return stop_args