--- a/script/stream/recorder_tweetstream.py Mon Dec 10 12:53:09 2012 +0100
+++ b/script/stream/recorder_tweetstream.py Tue Dec 11 10:46:35 2012 +0100
@@ -60,7 +60,7 @@
else:
consumer_key = models.CONSUMER_KEY
consumer_secret = models.CONSUMER_SECRET
- auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
+ auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=True)
auth.set_access_token(*access_token)
return auth
@@ -142,7 +142,7 @@
track_list = [k.strip() for k in track_list.split(',')]
self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))
- self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1)
+ self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger)
self.logger.debug("SourceProcess : after connecting to stream")
self.stream.muststop = lambda: self.stop_event.is_set()
@@ -202,7 +202,7 @@
source_stream_iter_thread.start()
while not self.stop_event.is_set():
- self.logger.info("SourceProcess : In while after start")
+ self.logger.debug("SourceProcess : In while after start")
self.stop_event.wait(DEFAULT_TIMEOUT)
if self.stop_event.is_set() and self.stream:
self.stream.close()
@@ -279,7 +279,6 @@
session.commit()
finally:
session.rollback()
- self.stop_event.set()
session.close()
@@ -411,16 +410,6 @@
cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
tweet_processes.append(cprocess)
- def interupt_handler(signum, frame):
- utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9)))
- stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
- stop_event.set()
-
- signal.signal(signal.SIGINT , interupt_handler)
- signal.signal(signal.SIGHUP , interupt_handler)
- signal.signal(signal.SIGALRM, interupt_handler)
- signal.signal(signal.SIGTERM, interupt_handler)
-
log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
log_thread.daemon = True
@@ -434,6 +423,16 @@
if options.duration >= 0:
end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
+
+ def interupt_handler(signum, frame):
+ utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9)))
+ stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
+ stop_event.set()
+
+ signal.signal(signal.SIGINT , interupt_handler)
+ signal.signal(signal.SIGHUP , interupt_handler)
+ signal.signal(signal.SIGALRM, interupt_handler)
+ signal.signal(signal.SIGTERM, interupt_handler)
while not stop_event.is_set():
@@ -441,7 +440,8 @@
stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
stop_event.set()
break
- if sprocess.is_alive():
+ if sprocess.is_alive():
+ utils.get_logger().debug("Source process alive")
time.sleep(1)
else:
stop_args.update({'message': 'Source process killed'})