--- a/script/lib/iri_tweet/iri_tweet/stream.py Mon Dec 10 12:53:09 2012 +0100
+++ b/script/lib/iri_tweet/iri_tweet/stream.py Tue Dec 11 10:46:35 2012 +0100
@@ -121,7 +121,7 @@
"""
def __init__(self, auth,
- catchup=None, raw=False, timeout=-1, url=None, compressed=False, chunk_size=4096):
+ catchup=None, raw=False, timeout=-1, url=None, compressed=False, chunk_size=4096, logger=None):
self._conn = None
self._rate_ts = None
self._rate_cnt = 0
@@ -141,6 +141,7 @@
if url: self.url = url
self.muststop = False
+ self._logger = logger
self._iter = self.__iter__()
@@ -154,6 +155,9 @@
def _init_conn(self):
"""Open the connection to the twitter server"""
+
+ if self._logger : self._logger.debug("BaseStream Open the connection to the twitter server")
+
headers = {'User-Agent': self.user_agent}
if self._compressed:
@@ -167,16 +171,22 @@
if self._auth:
self._auth.apply_auth(self.url, "POST", headers, postdata)
- self._resp = requests.post(self.url, headers=headers, data=postdata)
+ if self._logger : self._logger.debug("BaseStream init connection url " + repr(self.url))
+ if self._logger : self._logger.debug("BaseStream init connection headers " + repr(headers))
+ if self._logger : self._logger.debug("BaseStream init connection data " + repr(postdata))
+
+ self._resp = requests.post(self.url, headers=headers, data=postdata, prefetch=False)
+ if self._logger : self._logger.debug("BaseStream init connection " + repr(self._resp))
+
self._resp.raise_for_status()
self.connected = True
+
if not self._rate_ts:
self._rate_ts = time.time()
if not self.starttime:
self.starttime = time.time()
-
def _get_post_data(self):
"""Subclasses that need to add post data to the request can override
this method and return post data. The data should be in the format
@@ -235,9 +245,12 @@
def __iter__(self):
+ if self._logger : self._logger.debug("BaseStream __iter__")
if not self.connected:
+ if self._logger : self._logger.debug("BaseStream __iter__ not connected, connecting")
self._init_conn()
+ if self._logger : self._logger.debug("BaseStream __iter__ connected")
for line in self._iter_object():
if not line:
@@ -275,12 +288,12 @@
url = "https://stream.twitter.com/1.1/statuses/filter.json"
def __init__(self, auth, follow=None, locations=None,
- track=None, catchup=None, url=None, raw=False, timeout=None, compressed=False, chunk_size=4096):
+ track=None, catchup=None, url=None, raw=False, timeout=None, compressed=False, chunk_size=4096, logger=None):
self._follow = follow
self._locations = locations
self._track = track
# remove follow, locations, track
- BaseStream.__init__(self, auth, url=url, raw=raw, catchup=catchup, timeout=timeout, compressed=compressed, chunk_size=chunk_size)
+ BaseStream.__init__(self, auth, url=url, raw=raw, catchup=catchup, timeout=timeout, compressed=compressed, chunk_size=chunk_size, logger=logger)
def _get_post_data(self):
postdata = {}
@@ -308,7 +321,7 @@
# raise a ConnectionError instead
if callable(self._error_cb):
self._error_cb(e)
- self._logger.info("stream sleeping for %d ms " % self._retry_wait)
+ if self._logger: self._logger.info("stream sleeping for %d ms " % self._retry_wait)
time.sleep(float(self._retry_wait)/1000.0)
@@ -340,13 +353,14 @@
try:
if self._logger: self._logger.debug("inner loop")
for tweet in self._stream:
+ if self._logger: self._logger.debug("tweet : " + repr(tweet))
self._reconnects = 0
self._retry_wait = 0
if "warning" in tweet:
- self._logger.warning("Tweet warning received : %s" % repr(tweet))
+ if self._logger: self._logger.warning("Tweet warning received : %s" % repr(tweet))
continue
if not tweet.strip():
- self._logger.debug("Empty Tweet received : PING")
+ if self._logger: self._logger.debug("Empty Tweet received : PING")
continue
yield tweet
except (ConnectionError, requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.RequestException) as e:
--- a/script/stream/recorder_tweetstream.py Mon Dec 10 12:53:09 2012 +0100
+++ b/script/stream/recorder_tweetstream.py Tue Dec 11 10:46:35 2012 +0100
@@ -60,7 +60,7 @@
else:
consumer_key = models.CONSUMER_KEY
consumer_secret = models.CONSUMER_SECRET
- auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
+ auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=True)
auth.set_access_token(*access_token)
return auth
@@ -142,7 +142,7 @@
track_list = [k.strip() for k in track_list.split(',')]
self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))
- self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1)
+ self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger)
self.logger.debug("SourceProcess : after connecting to stream")
self.stream.muststop = lambda: self.stop_event.is_set()
@@ -202,7 +202,7 @@
source_stream_iter_thread.start()
while not self.stop_event.is_set():
- self.logger.info("SourceProcess : In while after start")
+ self.logger.debug("SourceProcess : In while after start")
self.stop_event.wait(DEFAULT_TIMEOUT)
if self.stop_event.is_set() and self.stream:
self.stream.close()
@@ -279,7 +279,6 @@
session.commit()
finally:
session.rollback()
- self.stop_event.set()
session.close()
@@ -411,16 +410,6 @@
cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
tweet_processes.append(cprocess)
- def interupt_handler(signum, frame):
- utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9)))
- stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
- stop_event.set()
-
- signal.signal(signal.SIGINT , interupt_handler)
- signal.signal(signal.SIGHUP , interupt_handler)
- signal.signal(signal.SIGALRM, interupt_handler)
- signal.signal(signal.SIGTERM, interupt_handler)
-
log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
log_thread.daemon = True
@@ -434,6 +423,16 @@
if options.duration >= 0:
end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
+
+ def interupt_handler(signum, frame):
+ utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9)))
+ stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
+ stop_event.set()
+
+ signal.signal(signal.SIGINT , interupt_handler)
+ signal.signal(signal.SIGHUP , interupt_handler)
+ signal.signal(signal.SIGALRM, interupt_handler)
+ signal.signal(signal.SIGTERM, interupt_handler)
while not stop_event.is_set():
@@ -441,7 +440,8 @@
stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
stop_event.set()
break
- if sprocess.is_alive():
+ if sprocess.is_alive():
+ utils.get_logger().debug("Source process alive")
time.sleep(1)
else:
stop_args.update({'message': 'Source process killed'})