correct listener.
authorYves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Tue, 11 Dec 2012 10:46:35 +0100
changeset 739 350ffcb7ae4d
parent 738 2497c7f38e0a
child 740 e11ef60262f0
correct listener. add log
script/lib/iri_tweet/iri_tweet/stream.py
script/stream/recorder_tweetstream.py
--- a/script/lib/iri_tweet/iri_tweet/stream.py	Mon Dec 10 12:53:09 2012 +0100
+++ b/script/lib/iri_tweet/iri_tweet/stream.py	Tue Dec 11 10:46:35 2012 +0100
@@ -121,7 +121,7 @@
     """
 
     def __init__(self, auth,
-                 catchup=None, raw=False, timeout=-1, url=None, compressed=False, chunk_size=4096):
+                 catchup=None, raw=False, timeout=-1, url=None, compressed=False, chunk_size=4096, logger=None):
         self._conn = None
         self._rate_ts = None
         self._rate_cnt = 0
@@ -141,6 +141,7 @@
         if url: self.url = url
         
         self.muststop = False
+        self._logger = logger
         
         self._iter = self.__iter__()
          
@@ -154,6 +155,9 @@
 
     def _init_conn(self):
         """Open the connection to the twitter server"""
+        
+        if self._logger : self._logger.debug("BaseStream Open the connection to the twitter server")
+        
         headers = {'User-Agent': self.user_agent}
         
         if self._compressed:
@@ -167,16 +171,22 @@
         if self._auth:
             self._auth.apply_auth(self.url, "POST", headers, postdata)
 
-        self._resp = requests.post(self.url, headers=headers, data=postdata)
+        if self._logger : self._logger.debug("BaseStream init connection url " + repr(self.url))
+        if self._logger : self._logger.debug("BaseStream init connection headers " + repr(headers))
+        if self._logger : self._logger.debug("BaseStream init connection data " + repr(postdata))
+        
+        self._resp = requests.post(self.url, headers=headers, data=postdata, prefetch=False)
+        if self._logger : self._logger.debug("BaseStream init connection " + repr(self._resp))
+        
         self._resp.raise_for_status()
         self.connected = True
+
         if not self._rate_ts:
             self._rate_ts = time.time()
         if not self.starttime:
             self.starttime = time.time()
 
 
-
     def _get_post_data(self):
         """Subclasses that need to add post data to the request can override
         this method and return post data. The data should be in the format
@@ -235,9 +245,12 @@
             
     def __iter__(self):
         
+        if self._logger : self._logger.debug("BaseStream __iter__")
         if not self.connected:
+            if self._logger : self._logger.debug("BaseStream __iter__ not connected, connecting")
             self._init_conn()
 
+        if self._logger : self._logger.debug("BaseStream __iter__ connected")
         for line in self._iter_object():
 
             if not line:
@@ -275,12 +288,12 @@
     url = "https://stream.twitter.com/1.1/statuses/filter.json"
 
     def __init__(self, auth, follow=None, locations=None,
-                 track=None, catchup=None, url=None, raw=False, timeout=None, compressed=False, chunk_size=4096):
+                 track=None, catchup=None, url=None, raw=False, timeout=None, compressed=False, chunk_size=4096, logger=None):
         self._follow = follow
         self._locations = locations
         self._track = track
         # remove follow, locations, track
-        BaseStream.__init__(self, auth, url=url, raw=raw, catchup=catchup, timeout=timeout, compressed=compressed, chunk_size=chunk_size)
+        BaseStream.__init__(self, auth, url=url, raw=raw, catchup=catchup, timeout=timeout, compressed=compressed, chunk_size=chunk_size, logger=logger)
 
     def _get_post_data(self):
         postdata = {}
@@ -308,7 +321,7 @@
         # raise a ConnectionError instead
         if  callable(self._error_cb):
             self._error_cb(e)
-        self._logger.info("stream sleeping for %d ms " % self._retry_wait)
+        if self._logger: self._logger.info("stream sleeping for %d ms " % self._retry_wait)
         time.sleep(float(self._retry_wait)/1000.0)
         
         
@@ -340,13 +353,14 @@
             try:
                 if self._logger: self._logger.debug("inner loop")
                 for tweet in self._stream:
+                    if self._logger: self._logger.debug("tweet : " + repr(tweet))
                     self._reconnects = 0
                     self._retry_wait = 0
                     if "warning" in tweet:
-                        self._logger.warning("Tweet warning received : %s" % repr(tweet))
+                        if self._logger: self._logger.warning("Tweet warning received : %s" % repr(tweet))
                         continue
                     if not tweet.strip():
-                        self._logger.debug("Empty Tweet received : PING")
+                        if self._logger: self._logger.debug("Empty Tweet received : PING")
                         continue
                     yield tweet
             except (ConnectionError, requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.RequestException) as e:
--- a/script/stream/recorder_tweetstream.py	Mon Dec 10 12:53:09 2012 +0100
+++ b/script/stream/recorder_tweetstream.py	Tue Dec 11 10:46:35 2012 +0100
@@ -60,7 +60,7 @@
     else:
         consumer_key = models.CONSUMER_KEY
         consumer_secret = models.CONSUMER_SECRET
-        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
+        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=True)
         auth.set_access_token(*access_token)
     return auth
 
@@ -142,7 +142,7 @@
         track_list = [k.strip() for k in track_list.split(',')]
 
         self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
-        self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1)
+        self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger)
         self.logger.debug("SourceProcess : after connecting to stream")
         self.stream.muststop = lambda: self.stop_event.is_set()        
         
@@ -202,7 +202,7 @@
         source_stream_iter_thread.start()
         
         while not self.stop_event.is_set():
-            self.logger.info("SourceProcess : In while after start")
+            self.logger.debug("SourceProcess : In while after start")
             self.stop_event.wait(DEFAULT_TIMEOUT)
             if self.stop_event.is_set() and self.stream:
                 self.stream.close()
@@ -279,7 +279,6 @@
                 session.commit()
         finally:
             session.rollback()
-            self.stop_event.set()
             session.close()
 
 
@@ -411,16 +410,6 @@
         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
         tweet_processes.append(cprocess)
 
-    def interupt_handler(signum, frame):
-        utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
-        stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
-        stop_event.set()
-        
-    signal.signal(signal.SIGINT , interupt_handler)
-    signal.signal(signal.SIGHUP , interupt_handler)
-    signal.signal(signal.SIGALRM, interupt_handler)
-    signal.signal(signal.SIGTERM, interupt_handler)
-
     log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
     log_thread.daemon = True
 
@@ -434,6 +423,16 @@
 
     if options.duration >= 0:
         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
+
+    def interupt_handler(signum, frame):
+        utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
+        stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
+        stop_event.set()
+        
+    signal.signal(signal.SIGINT , interupt_handler)
+    signal.signal(signal.SIGHUP , interupt_handler)
+    signal.signal(signal.SIGALRM, interupt_handler)
+    signal.signal(signal.SIGTERM, interupt_handler)
     
 
     while not stop_event.is_set():
@@ -441,7 +440,8 @@
             stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
             stop_event.set()
             break
-        if sprocess.is_alive():            
+        if sprocess.is_alive():
+            utils.get_logger().debug("Source process alive")
             time.sleep(1)
         else:
             stop_args.update({'message': 'Source process killed'})