script/stream/recorder_stream.py
changeset 890 9c57883dbb9d
parent 888 6fc6637d8403
child 893 10a19dd4e1c9
--- a/script/stream/recorder_stream.py	Tue May 07 19:33:35 2013 +0200
+++ b/script/stream/recorder_stream.py	Wed May 08 01:24:19 2013 +0200
@@ -197,8 +197,6 @@
         finally:
             session.rollback()
             session.close()
-            self.logger_queue.close()
-            self.queue.close()
             self.stream.close()
             self.stream = None
             if not self.stop_event.is_set():
@@ -225,7 +223,9 @@
             self.stream.close()
         elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
             self.stop_event.set()
-    
+
+        self.queue.cancel_join_thread()
+        self.logger_queue.cancel_join_thread()
         self.logger.info("SourceProcess : join")
         source_stream_iter_thread.join(30)
 
@@ -330,8 +330,6 @@
         session.commit()
         
     
-    # get tweet source that do not match any message
-    # select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
 def process_log(logger_queues, stop_event):
     while not stop_event.is_set():
         for lqueue in logger_queues:
@@ -484,6 +482,7 @@
         sprocess.join(10)
     except:
         utils.get_logger().debug("Pb joining Source Process - terminating")
+    finally:
         sprocess.terminate()
         
     for i, cprocess in enumerate(tweet_processes):
@@ -495,7 +494,15 @@
             cprocess.terminate()
 
     
-    utils.get_logger().debug("Close queues")        
+    utils.get_logger().debug("Close queues")
+    try:
+        queue.close()
+        for lqueue in logger_queues:
+            lqueue.close()
+    except Exception as e:
+        utils.get_logger().error("error when closing queues %s", repr(e))
+        # do nothing
+        
     
     if options.process_nb > 1:
         utils.get_logger().debug("Processing leftovers")
@@ -510,14 +517,6 @@
     for pengine in process_engines:
         pengine.dispose()
     
-    try:
-        queue.close()
-        for lqueue in logger_queues:
-            lqueue.close()
-    except Exception as e:
-        utils.get_logger().error("error when closing queues %s", repr(e))
-        # do nothing
-
     return stop_args