diff -r cdd7d3c0549c -r 9213a63fa34a script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Wed Jul 27 00:04:55 2011 +0200 +++ b/script/stream/recorder_tweetstream.py Mon Aug 08 09:01:40 2011 +0200 @@ -1,14 +1,13 @@ from getpass import getpass from iri_tweet import models, utils from iri_tweet.models import TweetSource, TweetLog -from multiprocessing import Queue, JoinableQueue, Process, Event +from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger from optparse import OptionParser from sqlalchemy.orm import sessionmaker -from sqlite3 import * import StringIO +import logging import anyjson import datetime -import logging import os import shutil import signal @@ -18,6 +17,8 @@ import traceback import tweepy.auth import tweetstream +from iri_tweet.utils import logger +from sqlalchemy.exc import OperationalError socket._fileobject.default_bufsize = 0 @@ -82,7 +83,6 @@ class SourceProcess(Process): def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event): - super(SourceProcess, self).__init__() self.session_maker = session_maker self.queue = queue self.auth = auth @@ -91,47 +91,93 @@ self.reconnects = reconnects self.token_filename = token_filename self.stop_event = stop_event + super(SourceProcess, self).__init__() # self.stop_event = def run(self): + get_logger().debug("SourceProcess : run") track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() track_list = [k for k in track_list.split(',')] - + + get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list)) stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True) + get_logger().debug("SourceProcess : after connecting to stream") stream.muststop = lambda: self.stop_event.is_set() session = self.session_maker() try: for tweet in stream: + get_logger().debug("tweet " + repr(tweet)) source = TweetSource(original_json=tweet) - session.add(source) - session.flush() + get_logger().debug("source created") + add_retries = 0 + while add_retries < 10: + try: + add_retries += 1 + session.add(source) + session.flush() + break + except OperationalError as e: + session.rollback() + get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries)) + if add_retries==10: + raise e + source_id = source.id - queue.put((source_id, tweet), False) + get_logger().debug("before queue + source id " + repr(source_id)) + self.queue.put((source_id, tweet), False) #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename) - logging.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) + get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) session.commit() # if duration >= 0 and datetime.datetime.utcnow() >= end_ts: # print "Stop recording after %d seconds." % (duration) # break - except: + except Exception as e: + get_logger().error("Error when processing tweet " + repr(e)) + finally: session.rollback() - finally: stream.close() session.close() - + self.queue.close() + self.stop_event.set() + + +def process_tweet(tweet, source_id, session, token_filename): + try: + tweet_obj = anyjson.deserialize(tweet) + screen_name = "" + if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: + screen_name = tweet_obj['user']['screen_name'] + get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) + get_logger().debug(u"Process_tweet :" + repr(tweet)) + processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename) + processor.process() + except Exception as e: + message = u"Error %s processing tweet %s" % (repr(e), tweet) + get_logger().error(message) + output = StringIO.StringIO() + traceback.print_exception(Exception, e, None, None, output) + error_stack = output.getvalue() + output.close() + session.rollback() + tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack) + session.add(tweet_log) + session.commit() + + class TweetProcess(Process): def __init__(self, session_maker, queue, debug, token_filename, stop_event): - super(TweetProcess, self).__init__() self.session_maker = session_maker self.queue = queue self.debug = debug self.stop_event = stop_event self.token_filename = token_filename + super(TweetProcess, self).__init__() + def run(self): @@ -139,74 +185,33 @@ try: while not self.stop_event.is_set(): try: - source_id, tweet_txt = queue.get(True, 30) - except: + source_id, tweet_txt = queue.get(True, 10) + get_logger().debug("Processing source id " + repr(source_id)) + except Exception as e: + get_logger().debug('Process tweet exception in loop : ' + repr(e)) continue - process_tweet(tweet_txt, source_id, session) + process_tweet(tweet_txt, source_id, session, self.token_filename) session.commit() - self.queue.task_done() except: - session.rollback() raise finally: + session.rollback() + self.stop_event.set() session.close() - - def process_tweet(tweet, source_id, session): - - try: - tweet_obj = anyjson.deserialize(tweet) - screen_name = "" - if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: - screen_name = tweet_obj['user']['screen_name'] - logging.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) - logging.debug(u"Process_tweet :" + repr(tweet)) - processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, self.token_filename) - processor.process() - except Exception, e: - message = u"Error %e processing tweet %s" % (unicode(e), tweet) - logging.error(message) - output = StringIO.StringIO() - traceback.print_exception(Exception, e, None, None, output) - tweet_log = TweetLog(status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=output.getvalue()) - output.close() - - - -#def main_source(username, password, track, session, debug, reconnects, token_filename, duration): - - #username = username or raw_input('Twitter username: ') - #password = password or getpass('Twitter password: ') +def process_leftovers(session, token_filename): + + sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) + + for src in sources: + tweet_txt = src.original_json + process_tweet(tweet_txt, src.id, session, token_filename) -# track_list = track or raw_input('Keywords to track (comma seperated): ').strip() -# track_list = [k for k in track_list.split(',')] - -# if username and password: -# auth = tweepy.auth.BasicAuthHandler(username, password) -# else: -# consumer_key = models.CONSUMER_KEY -# consumer_secret = models.CONSUMER_SECRET -# auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) -# auth.set_access_token(*(utils.get_oauth_token(token_filename))) - -# if duration >= 0: -# end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration) + -# stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects, as_text=True) -# try: -# for tweet in stream: -# source = TweetSource(original_json=tweet) -# session.add(source) -# session.flush() -# source_id = source.id -# process_tweet(tweet, source_id, session, debug, token_filename) -# logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime))) -# session.commit() -# if duration >= 0 and datetime.datetime.utcnow() >= end_ts: -# print "Stop recording after %d seconds." % (duration) -# break -# finally: -# stream.close() + #get tweet source that do not match any message + #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull; + def get_options(): parser = OptionParser() @@ -240,7 +245,7 @@ (options, args) = get_options() - utils.set_logging(options) + utils.set_logging(options, get_logger()) if options.debug: print "OPTIONS : " @@ -249,18 +254,16 @@ if options.new and os.path.exists(options.filename): i = 1 basename, extension = os.path.splitext(options.filename) - new_path = '$s.%d.%s' % (basename, i, extension) + new_path = '%s.%d%s' % (basename, i, extension) while i < 1000000 and os.path.exists(new_path): i += 1 - new_path = '$s.%d.%s' % (basename, i, extension) + new_path = '%s.%d%s' % (basename, i, extension) if i >= 1000000: raise Exception("Unable to find new filename for " + options.filename) else: shutil.move(options.filename, new_path) - engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2)) - Session = sessionmaker(bind=engine) queue = JoinableQueue() stop_event = Event() @@ -272,12 +275,22 @@ auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) auth.set_access_token(*(utils.get_oauth_token(options.token_filename))) - + + engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2)) + Session = sessionmaker(bind=engine) + + session = Session() + process_leftovers(session, options.token_filename) + session.commit() + session.close() + sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event) tweet_processes = [] for i in range(options.consumer_nb): + engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2)) + Session = sessionmaker(bind=engine) cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event) tweet_processes.append(cprocess) @@ -302,7 +315,19 @@ else: break + get_logger().debug("Joining Source Process") sprocess.join() - queue.join() - for cprocess in tweet_processes: + get_logger().debug("Joining Queue") + #queue.join() + for i,cprocess in enumerate(tweet_processes): + get_logger().debug("Joining consumer process Nb %d" % (i+1)) cprocess.join() + + get_logger().debug("Processing leftovers") + session = Session() + process_leftovers(session, options.token_filename) + session.commit() + session.close() + + get_logger().debug("Done. Exiting.") + \ No newline at end of file