script/stream/recorder_tweetstream.py
changeset 739 350ffcb7ae4d
parent 738 2497c7f38e0a
--- a/script/stream/recorder_tweetstream.py	Mon Dec 10 12:53:09 2012 +0100
+++ b/script/stream/recorder_tweetstream.py	Tue Dec 11 10:46:35 2012 +0100
@@ -60,7 +60,7 @@
     else:
         consumer_key = models.CONSUMER_KEY
         consumer_secret = models.CONSUMER_SECRET
-        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
+        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=True)
         auth.set_access_token(*access_token)
     return auth
 
@@ -142,7 +142,7 @@
         track_list = [k.strip() for k in track_list.split(',')]
 
         self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
-        self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1)
+        self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger)
         self.logger.debug("SourceProcess : after connecting to stream")
         self.stream.muststop = lambda: self.stop_event.is_set()        
         
@@ -202,7 +202,7 @@
         source_stream_iter_thread.start()
         
         while not self.stop_event.is_set():
-            self.logger.info("SourceProcess : In while after start")
+            self.logger.debug("SourceProcess : In while after start")
             self.stop_event.wait(DEFAULT_TIMEOUT)
             if self.stop_event.is_set() and self.stream:
                 self.stream.close()
@@ -279,7 +279,6 @@
                 session.commit()
         finally:
             session.rollback()
-            self.stop_event.set()
             session.close()
 
 
@@ -411,16 +410,6 @@
         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
         tweet_processes.append(cprocess)
 
-    def interupt_handler(signum, frame):
-        utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
-        stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
-        stop_event.set()
-        
-    signal.signal(signal.SIGINT , interupt_handler)
-    signal.signal(signal.SIGHUP , interupt_handler)
-    signal.signal(signal.SIGALRM, interupt_handler)
-    signal.signal(signal.SIGTERM, interupt_handler)
-
     log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
     log_thread.daemon = True
 
@@ -434,6 +423,16 @@
 
     if options.duration >= 0:
         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
+
+    def interupt_handler(signum, frame):
+        utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
+        stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
+        stop_event.set()
+        
+    signal.signal(signal.SIGINT , interupt_handler)
+    signal.signal(signal.SIGHUP , interupt_handler)
+    signal.signal(signal.SIGALRM, interupt_handler)
+    signal.signal(signal.SIGTERM, interupt_handler)
     
 
     while not stop_event.is_set():
@@ -441,7 +440,8 @@
             stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
             stop_event.set()
             break
-        if sprocess.is_alive():            
+        if sprocess.is_alive():
+            utils.get_logger().debug("Source process alive")
             time.sleep(1)
         else:
             stop_args.update({'message': 'Source process killed'})