diff -r 51072e5e6ea9 -r 2ef837069108 script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Mon Oct 15 16:56:57 2012 +0200 +++ b/script/stream/recorder_tweetstream.py Mon Oct 15 17:01:50 2012 +0200 @@ -23,7 +23,7 @@ import time import traceback import tweepy.auth -import tweetstream +import iri_tweet.stream as tweetstream import urllib2 socket._fileobject.default_bufsize = 0 @@ -35,6 +35,7 @@ columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count'] #just put it in a sqlite3 tqble +DEFAULT_TIMEOUT = 5 def set_logging(options): loggers = [] @@ -124,32 +125,36 @@ self.token_filename = options.token_filename self.catchup = options.catchup self.timeout = options.timeout + self.stream = None super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) - - def do_run(self): + + def __source_stream_iter(self): - #import pydevd - #pydevd.settrace(suspend=True) - self.logger = set_logging_process(self.options, self.logger_queue) - self.auth = get_auth(self.options, self.access_token) + self.logger.debug("SourceProcess : run ") - self.logger.debug("SourceProcess : run ") + self.auth = get_auth(self.options, self.access_token) + self.logger.debug("SourceProcess : auth set ") + track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() self.logger.debug("SourceProcess : track list " + track_list) track_list = [k.strip() for k in track_list.split(',')] self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) - stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout) + self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1) self.logger.debug("SourceProcess : after connecting to stream") - stream.muststop = lambda: self.stop_event.is_set() + self.stream.muststop = lambda: self.stop_event.is_set() + + stream_wrapper = tweetstream.SafeStreamWrapper(self.stream, logger=self.logger) session = self.session_maker() try: - for tweet in stream: + for tweet in stream_wrapper: if not self.parent_is_alive(): + self.stop_event.set() + stop_thread.join(5) sys.exit() self.logger.debug("SourceProcess : tweet " + repr(tweet)) source = TweetSource(original_json=tweet) @@ -165,26 +170,56 @@ session.rollback() self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries)) if add_retries == 10: - raise e + raise source_id = source.id self.logger.debug("SourceProcess : before queue + source id " + repr(source_id)) - self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) + self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime))) session.commit() self.queue.put((source_id, tweet), False) except Exception as e: self.logger.error("SourceProcess : Error when processing tweet " + repr(e)) + raise finally: session.rollback() - stream.close() session.close() - self.queue.close() - self.stop_event.set() + with self.logger_queue.mutex: + self.logger_queue.clear() + self.logger_queue.close() + with self.queue.mutex: + self.queue.clear() + self.queue.close() + self.stream.close() + self.stream = None + if not self.stop_event.is_set(): + self.stop_event.set() + + + def do_run(self): + + import pydevd + pydevd.settrace(suspend=False) + + source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread") + + source_stream_iter_thread.start() + + while not self.stop_event.is_set(): + self.stop_event.wait(DEFAULT_TIMEOUT) + if self.stop_event.is_set() and self.stream: + self.stream.close() + elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive: + with self.stop_event.mutex: + self.stop_event.set() + + source_stream_iter_thread.join(30) def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger): try: + if not tweet.strip(): + return tweet_obj = anyjson.deserialize(tweet) if 'text' not in tweet_obj: tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) @@ -197,6 +232,17 @@ logger.debug(u"Process_tweet :" + repr(tweet)) processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user) processor.process() + except ValueError as e: + message = u"Value Error %s processing tweet %s" % (repr(e), tweet) + output = StringIO.StringIO() + try: + traceback.print_exc(file=output) + error_stack = output.getvalue() + finally: + output.close() + tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack) + session.add(tweet_log) + session.commit() except Exception as e: message = u"Error %s processing tweet %s" % (repr(e), tweet) logger.exception(message) @@ -237,6 +283,10 @@ finally: session.rollback() self.stop_event.set() + with self.logger_queue.mutex: + self.logger_queue.clear() + with self.queue.mutex: + self.queue.clear() session.close() @@ -353,7 +403,7 @@ SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) process_engines.append(engine_process) - lqueue = mQueue(1) + lqueue = mQueue(50) logger_queues.append(lqueue) pid = os.getpid() sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) @@ -363,7 +413,7 @@ for i in range(options.process_nb - 1): SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) process_engines.append(engine_process) - lqueue = mQueue(1) + lqueue = mQueue(50) logger_queues.append(lqueue) cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) tweet_processes.append(cprocess)