# HG changeset patch # User Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com> # Date 1355219195 -3600 # Node ID 350ffcb7ae4d83cbb949e370524ee075ea376f64 # Parent 2497c7f38e0a4509c617d11c82bea408247a3168 correct listener. add log diff -r 2497c7f38e0a -r 350ffcb7ae4d script/lib/iri_tweet/iri_tweet/stream.py --- a/script/lib/iri_tweet/iri_tweet/stream.py Mon Dec 10 12:53:09 2012 +0100 +++ b/script/lib/iri_tweet/iri_tweet/stream.py Tue Dec 11 10:46:35 2012 +0100 @@ -121,7 +121,7 @@ """ def __init__(self, auth, - catchup=None, raw=False, timeout=-1, url=None, compressed=False, chunk_size=4096): + catchup=None, raw=False, timeout=-1, url=None, compressed=False, chunk_size=4096, logger=None): self._conn = None self._rate_ts = None self._rate_cnt = 0 @@ -141,6 +141,7 @@ if url: self.url = url self.muststop = False + self._logger = logger self._iter = self.__iter__() @@ -154,6 +155,9 @@ def _init_conn(self): """Open the connection to the twitter server""" + + if self._logger : self._logger.debug("BaseStream Open the connection to the twitter server") + headers = {'User-Agent': self.user_agent} if self._compressed: @@ -167,16 +171,22 @@ if self._auth: self._auth.apply_auth(self.url, "POST", headers, postdata) - self._resp = requests.post(self.url, headers=headers, data=postdata) + if self._logger : self._logger.debug("BaseStream init connection url " + repr(self.url)) + if self._logger : self._logger.debug("BaseStream init connection headers " + repr(headers)) + if self._logger : self._logger.debug("BaseStream init connection data " + repr(postdata)) + + self._resp = requests.post(self.url, headers=headers, data=postdata, prefetch=False) + if self._logger : self._logger.debug("BaseStream init connection " + repr(self._resp)) + self._resp.raise_for_status() self.connected = True + if not self._rate_ts: self._rate_ts = time.time() if not self.starttime: self.starttime = time.time() - def _get_post_data(self): """Subclasses that need to add post data to the request can override this method and return post data. The data should be in the format @@ -235,9 +245,12 @@ def __iter__(self): + if self._logger : self._logger.debug("BaseStream __iter__") if not self.connected: + if self._logger : self._logger.debug("BaseStream __iter__ not connected, connecting") self._init_conn() + if self._logger : self._logger.debug("BaseStream __iter__ connected") for line in self._iter_object(): if not line: @@ -275,12 +288,12 @@ url = "https://stream.twitter.com/1.1/statuses/filter.json" def __init__(self, auth, follow=None, locations=None, - track=None, catchup=None, url=None, raw=False, timeout=None, compressed=False, chunk_size=4096): + track=None, catchup=None, url=None, raw=False, timeout=None, compressed=False, chunk_size=4096, logger=None): self._follow = follow self._locations = locations self._track = track # remove follow, locations, track - BaseStream.__init__(self, auth, url=url, raw=raw, catchup=catchup, timeout=timeout, compressed=compressed, chunk_size=chunk_size) + BaseStream.__init__(self, auth, url=url, raw=raw, catchup=catchup, timeout=timeout, compressed=compressed, chunk_size=chunk_size, logger=logger) def _get_post_data(self): postdata = {} @@ -308,7 +321,7 @@ # raise a ConnectionError instead if callable(self._error_cb): self._error_cb(e) - self._logger.info("stream sleeping for %d ms " % self._retry_wait) + if self._logger: self._logger.info("stream sleeping for %d ms " % self._retry_wait) time.sleep(float(self._retry_wait)/1000.0) @@ -340,13 +353,14 @@ try: if self._logger: self._logger.debug("inner loop") for tweet in self._stream: + if self._logger: self._logger.debug("tweet : " + repr(tweet)) self._reconnects = 0 self._retry_wait = 0 if "warning" in tweet: - self._logger.warning("Tweet warning received : %s" % repr(tweet)) + if self._logger: self._logger.warning("Tweet warning received : %s" % repr(tweet)) continue if not tweet.strip(): - self._logger.debug("Empty Tweet received : PING") + if self._logger: self._logger.debug("Empty Tweet received : PING") continue yield tweet except (ConnectionError, requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.RequestException) as e: diff -r 2497c7f38e0a -r 350ffcb7ae4d script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Mon Dec 10 12:53:09 2012 +0100 +++ b/script/stream/recorder_tweetstream.py Tue Dec 11 10:46:35 2012 +0100 @@ -60,7 +60,7 @@ else: consumer_key = models.CONSUMER_KEY consumer_secret = models.CONSUMER_SECRET - auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) + auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=True) auth.set_access_token(*access_token) return auth @@ -142,7 +142,7 @@ track_list = [k.strip() for k in track_list.split(',')] self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) - self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1) + self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger) self.logger.debug("SourceProcess : after connecting to stream") self.stream.muststop = lambda: self.stop_event.is_set() @@ -202,7 +202,7 @@ source_stream_iter_thread.start() while not self.stop_event.is_set(): - self.logger.info("SourceProcess : In while after start") + self.logger.debug("SourceProcess : In while after start") self.stop_event.wait(DEFAULT_TIMEOUT) if self.stop_event.is_set() and self.stream: self.stream.close() @@ -279,7 +279,6 @@ session.commit() finally: session.rollback() - self.stop_event.set() session.close() @@ -411,16 +410,6 @@ cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) tweet_processes.append(cprocess) - def interupt_handler(signum, frame): - utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) - stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}) - stop_event.set() - - signal.signal(signal.SIGINT , interupt_handler) - signal.signal(signal.SIGHUP , interupt_handler) - signal.signal(signal.SIGALRM, interupt_handler) - signal.signal(signal.SIGTERM, interupt_handler) - log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) log_thread.daemon = True @@ -434,6 +423,16 @@ if options.duration >= 0: end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) + + def interupt_handler(signum, frame): + utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) + stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}) + stop_event.set() + + signal.signal(signal.SIGINT , interupt_handler) + signal.signal(signal.SIGHUP , interupt_handler) + signal.signal(signal.SIGALRM, interupt_handler) + signal.signal(signal.SIGTERM, interupt_handler) while not stop_event.is_set(): @@ -441,7 +440,8 @@ stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts}) stop_event.set() break - if sprocess.is_alive(): + if sprocess.is_alive(): + utils.get_logger().debug("Source process alive") time.sleep(1) else: stop_args.update({'message': 'Source process killed'})