script/stream/recorder_tweetstream.py
changeset 738 2497c7f38e0a
parent 737 8f6a4d6dfe14
child 739 350ffcb7ae4d
--- a/script/stream/recorder_tweetstream.py	Mon Dec 10 11:00:57 2012 +0100
+++ b/script/stream/recorder_tweetstream.py	Mon Dec 10 12:53:09 2012 +0100
@@ -184,12 +184,8 @@
         finally:
             session.rollback()
             session.close()
-            with self.logger_queue.mutex:
-                self.logger_queue.clear()
-                self.logger_queue.close()
-            with self.queue.mutex:
-                self.queue.clear()
-                self.queue.close()
+            self.logger_queue.close()
+            self.queue.close()
             self.stream.close()
             self.stream = None
             if not self.stop_event.is_set():
@@ -205,14 +201,15 @@
         
         source_stream_iter_thread.start()
         
-        while not self.stop_event.is_set():        
+        while not self.stop_event.is_set():
+            self.logger.info("SourceProcess : In while after start")
             self.stop_event.wait(DEFAULT_TIMEOUT)
             if self.stop_event.is_set() and self.stream:
                 self.stream.close()
             elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
-                with self.stop_event.mutex:
-                    self.stop_event.set()
+                self.stop_event.set()
 
+        self.logger.info("SourceProcess : join")
         source_stream_iter_thread.join(30)
 
 
@@ -283,10 +280,6 @@
         finally:
             session.rollback()
             self.stop_event.set()
-            with self.logger_queue.mutex:
-                self.logger_queue.clear()
-            with self.queue.mutex:
-                self.queue.clear()
             session.close()