script/stream/recorder_tweetstream.py
changeset 693 2ef837069108
parent 528 7fb5a7b0d35c
child 737 8f6a4d6dfe14
--- a/script/stream/recorder_tweetstream.py	Mon Oct 15 16:56:57 2012 +0200
+++ b/script/stream/recorder_tweetstream.py	Mon Oct 15 17:01:50 2012 +0200
@@ -23,7 +23,7 @@
 import time
 import traceback
 import tweepy.auth
-import tweetstream
+import iri_tweet.stream as tweetstream
 import urllib2
 socket._fileobject.default_bufsize = 0
 
@@ -35,6 +35,7 @@
 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
 #just put it in a sqlite3 tqble
 
+DEFAULT_TIMEOUT = 5
 
 def set_logging(options):
     loggers = []
@@ -124,32 +125,36 @@
         self.token_filename = options.token_filename
         self.catchup = options.catchup
         self.timeout = options.timeout
+        self.stream = None
         super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
-
-    def do_run(self):
+                    
+    def __source_stream_iter(self):
         
-        #import pydevd
-        #pydevd.settrace(suspend=True)
-
         self.logger = set_logging_process(self.options, self.logger_queue)
-        self.auth = get_auth(self.options, self.access_token) 
+        self.logger.debug("SourceProcess : run ")
         
-        self.logger.debug("SourceProcess : run ")
+        self.auth = get_auth(self.options, self.access_token) 
+        self.logger.debug("SourceProcess : auth set ")
+        
         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
         self.logger.debug("SourceProcess : track list " + track_list)
         
         track_list = [k.strip() for k in track_list.split(',')]
 
         self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
-        stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout)
+        self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1)
         self.logger.debug("SourceProcess : after connecting to stream")
-        stream.muststop = lambda: self.stop_event.is_set()
+        self.stream.muststop = lambda: self.stop_event.is_set()        
+        
+        stream_wrapper = tweetstream.SafeStreamWrapper(self.stream, logger=self.logger)
         
         session = self.session_maker()
         
         try:
-            for tweet in stream:
+            for tweet in stream_wrapper:
                 if not self.parent_is_alive():
+                    self.stop_event.set()
+                    stop_thread.join(5)
                     sys.exit()
                 self.logger.debug("SourceProcess : tweet " + repr(tweet))
                 source = TweetSource(original_json=tweet)
@@ -165,26 +170,56 @@
                         session.rollback()
                         self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
                         if add_retries == 10:
-                            raise e
+                            raise
                      
                 source_id = source.id
                 self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
-                self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+                self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
                 session.commit()
                 self.queue.put((source_id, tweet), False)
 
         except Exception as e:
             self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
+            raise
         finally:
             session.rollback()
-            stream.close()
             session.close()
-            self.queue.close()
-            self.stop_event.set()
+            with self.logger_queue.mutex:
+                self.logger_queue.clear()
+                self.logger_queue.close()
+            with self.queue.mutex:
+                self.queue.clear()
+                self.queue.close()
+            self.stream.close()
+            self.stream = None
+            if not self.stop_event.is_set():
+                self.stop_event.set()
+
+
+    def do_run(self):
+        
+        import pydevd
+        pydevd.settrace(suspend=False)
+        
+        source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
+        
+        source_stream_iter_thread.start()
+        
+        while not self.stop_event.is_set():        
+            self.stop_event.wait(DEFAULT_TIMEOUT)
+            if self.stop_event.is_set() and self.stream:
+                self.stream.close()
+            elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
+                with self.stop_event.mutex:
+                    self.stop_event.set()
+
+        source_stream_iter_thread.join(30)
 
 
 def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger):
     try:
+        if not tweet.strip():
+            return
         tweet_obj = anyjson.deserialize(tweet)
         if 'text' not in tweet_obj:
             tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
@@ -197,6 +232,17 @@
         logger.debug(u"Process_tweet :" + repr(tweet))
         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user)
         processor.process()
+    except ValueError as e:
+        message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
+        output = StringIO.StringIO()
+        try:
+            traceback.print_exc(file=output)
+            error_stack = output.getvalue()
+        finally:
+            output.close()
+        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
+        session.add(tweet_log)
+        session.commit()        
     except Exception as e:
         message = u"Error %s processing tweet %s" % (repr(e), tweet)
         logger.exception(message)
@@ -237,6 +283,10 @@
         finally:
             session.rollback()
             self.stop_event.set()
+            with self.logger_queue.mutex:
+                self.logger_queue.clear()
+            with self.queue.mutex:
+                self.queue.clear()
             session.close()
 
 
@@ -353,7 +403,7 @@
     
     SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
     process_engines.append(engine_process)
-    lqueue = mQueue(1)
+    lqueue = mQueue(50)
     logger_queues.append(lqueue)
     pid = os.getpid()
     sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
@@ -363,7 +413,7 @@
     for i in range(options.process_nb - 1):
         SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
         process_engines.append(engine_process)
-        lqueue = mQueue(1)
+        lqueue = mQueue(50)
         logger_queues.append(lqueue)
         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
         tweet_processes.append(cprocess)