diff -r 2497c7f38e0a -r 350ffcb7ae4d script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Mon Dec 10 12:53:09 2012 +0100 +++ b/script/stream/recorder_tweetstream.py Tue Dec 11 10:46:35 2012 +0100 @@ -60,7 +60,7 @@ else: consumer_key = models.CONSUMER_KEY consumer_secret = models.CONSUMER_SECRET - auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) + auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=True) auth.set_access_token(*access_token) return auth @@ -142,7 +142,7 @@ track_list = [k.strip() for k in track_list.split(',')] self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) - self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1) + self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger) self.logger.debug("SourceProcess : after connecting to stream") self.stream.muststop = lambda: self.stop_event.is_set() @@ -202,7 +202,7 @@ source_stream_iter_thread.start() while not self.stop_event.is_set(): - self.logger.info("SourceProcess : In while after start") + self.logger.debug("SourceProcess : In while after start") self.stop_event.wait(DEFAULT_TIMEOUT) if self.stop_event.is_set() and self.stream: self.stream.close() @@ -279,7 +279,6 @@ session.commit() finally: session.rollback() - self.stop_event.set() session.close() @@ -411,16 +410,6 @@ cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) tweet_processes.append(cprocess) - def interupt_handler(signum, frame): - utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) - stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}) - stop_event.set() - - signal.signal(signal.SIGINT , interupt_handler) - signal.signal(signal.SIGHUP , interupt_handler) - signal.signal(signal.SIGALRM, interupt_handler) - signal.signal(signal.SIGTERM, interupt_handler) - log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) log_thread.daemon = True @@ -434,6 +423,16 @@ if options.duration >= 0: end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) + + def interupt_handler(signum, frame): + utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) + stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}) + stop_event.set() + + signal.signal(signal.SIGINT , interupt_handler) + signal.signal(signal.SIGHUP , interupt_handler) + signal.signal(signal.SIGALRM, interupt_handler) + signal.signal(signal.SIGTERM, interupt_handler) while not stop_event.is_set(): @@ -441,7 +440,8 @@ stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts}) stop_event.set() break - if sprocess.is_alive(): + if sprocess.is_alive(): + utils.get_logger().debug("Source process alive") time.sleep(1) else: stop_args.update({'message': 'Source process killed'})