--- a/script/stream/recorder_tweetstream.py Mon Oct 15 16:56:57 2012 +0200
+++ b/script/stream/recorder_tweetstream.py Mon Oct 15 17:01:50 2012 +0200
@@ -23,7 +23,7 @@
import time
import traceback
import tweepy.auth
-import tweetstream
+import iri_tweet.stream as tweetstream
import urllib2
socket._fileobject.default_bufsize = 0
@@ -35,6 +35,7 @@
columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
#just put it in a sqlite3 tqble
+DEFAULT_TIMEOUT = 5
def set_logging(options):
loggers = []
@@ -124,32 +125,36 @@
self.token_filename = options.token_filename
self.catchup = options.catchup
self.timeout = options.timeout
+ self.stream = None
super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
-
- def do_run(self):
+
+ def __source_stream_iter(self):
- #import pydevd
- #pydevd.settrace(suspend=True)
-
self.logger = set_logging_process(self.options, self.logger_queue)
- self.auth = get_auth(self.options, self.access_token)
+ self.logger.debug("SourceProcess : run ")
- self.logger.debug("SourceProcess : run ")
+ self.auth = get_auth(self.options, self.access_token)
+ self.logger.debug("SourceProcess : auth set ")
+
track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
self.logger.debug("SourceProcess : track list " + track_list)
track_list = [k.strip() for k in track_list.split(',')]
self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))
- stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout)
+ self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1)
self.logger.debug("SourceProcess : after connecting to stream")
- stream.muststop = lambda: self.stop_event.is_set()
+ self.stream.muststop = lambda: self.stop_event.is_set()
+
+ stream_wrapper = tweetstream.SafeStreamWrapper(self.stream, logger=self.logger)
session = self.session_maker()
try:
- for tweet in stream:
+ for tweet in stream_wrapper:
if not self.parent_is_alive():
+ self.stop_event.set()
+ stop_thread.join(5)
sys.exit()
self.logger.debug("SourceProcess : tweet " + repr(tweet))
source = TweetSource(original_json=tweet)
@@ -165,26 +170,56 @@
session.rollback()
self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
if add_retries == 10:
- raise e
+ raise
source_id = source.id
self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
- self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+ self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
session.commit()
self.queue.put((source_id, tweet), False)
except Exception as e:
self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
+ raise
finally:
session.rollback()
- stream.close()
session.close()
- self.queue.close()
- self.stop_event.set()
+ with self.logger_queue.mutex:
+ self.logger_queue.clear()
+ self.logger_queue.close()
+ with self.queue.mutex:
+ self.queue.clear()
+ self.queue.close()
+ self.stream.close()
+ self.stream = None
+ if not self.stop_event.is_set():
+ self.stop_event.set()
+
+
+ def do_run(self):
+
+ import pydevd
+ pydevd.settrace(suspend=False)
+
+ source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
+
+ source_stream_iter_thread.start()
+
+ while not self.stop_event.is_set():
+ self.stop_event.wait(DEFAULT_TIMEOUT)
+ if self.stop_event.is_set() and self.stream:
+ self.stream.close()
+ elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
+ with self.stop_event.mutex:
+ self.stop_event.set()
+
+ source_stream_iter_thread.join(30)
def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger):
try:
+ if not tweet.strip():
+ return
tweet_obj = anyjson.deserialize(tweet)
if 'text' not in tweet_obj:
tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
@@ -197,6 +232,17 @@
logger.debug(u"Process_tweet :" + repr(tweet))
processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user)
processor.process()
+ except ValueError as e:
+ message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
+ output = StringIO.StringIO()
+ try:
+ traceback.print_exc(file=output)
+ error_stack = output.getvalue()
+ finally:
+ output.close()
+ tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
+ session.add(tweet_log)
+ session.commit()
except Exception as e:
message = u"Error %s processing tweet %s" % (repr(e), tweet)
logger.exception(message)
@@ -237,6 +283,10 @@
finally:
session.rollback()
self.stop_event.set()
+ with self.logger_queue.mutex:
+ self.logger_queue.clear()
+ with self.queue.mutex:
+ self.queue.clear()
session.close()
@@ -353,7 +403,7 @@
SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
process_engines.append(engine_process)
- lqueue = mQueue(1)
+ lqueue = mQueue(50)
logger_queues.append(lqueue)
pid = os.getpid()
sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
@@ -363,7 +413,7 @@
for i in range(options.process_nb - 1):
SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
process_engines.append(engine_process)
- lqueue = mQueue(1)
+ lqueue = mQueue(50)
logger_queues.append(lqueue)
cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
tweet_processes.append(cprocess)