diff -r 2209e66bb50b -r 500cd0405c7a script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Fri Aug 12 18:17:27 2011 +0200 +++ b/script/stream/recorder_tweetstream.py Wed Aug 17 18:32:07 2011 +0200 @@ -1,10 +1,12 @@ from getpass import getpass from iri_tweet import models, utils from iri_tweet.models import TweetSource, TweetLog -from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger +from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, + get_logger) from optparse import OptionParser from sqlalchemy.exc import OperationalError from sqlalchemy.orm import scoped_session, sessionmaker +import Queue import StringIO import anyjson import datetime @@ -16,6 +18,7 @@ import socket import sqlalchemy.schema import sys +import threading import time import traceback import tweepy.auth @@ -34,7 +37,7 @@ def set_logging(options): - utils.set_logging(options, logging.getLogger('iri_tweet')) + utils.set_logging(options, logging.getLogger('iri.tweet')) utils.set_logging(options, logging.getLogger('multiprocessing')) if options.debug >= 2: utils.set_logging(options, logging.getLogger('sqlalchemy.engine')) @@ -42,6 +45,11 @@ #utils.set_logging(options, logging.getLogger('sqlalchemy.pool')) #utils.set_logging(options, logging.getLogger('sqlalchemy.orm')) +def set_logging_process(options, queue): + qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue) + qlogger.propagate = 0 + return qlogger + def get_auth(options, access_token): if options.username and options.password: auth = tweepy.auth.BasicAuthHandler(options.username, options.password) @@ -107,7 +115,7 @@ class SourceProcess(Process): - def __init__(self, session_maker, queue, options, access_token, stop_event): + def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue): self.session_maker = session_maker self.queue = queue self.track = options.track @@ -116,31 +124,33 @@ self.stop_event = stop_event self.options = options self.access_token = access_token + self.logger_queue = logger_queue super(SourceProcess, self).__init__() def run(self): + #import pydevd #pydevd.settrace(suspend=False) - set_logging(self.options) + self.logger = set_logging_process(self.options, self.logger_queue) self.auth = get_auth(self.options, self.access_token) - utils.get_logger().debug("SourceProcess : run") + self.logger.debug("SourceProcess : run") track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() track_list = [k for k in track_list.split(',')] - utils.get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list)) + self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True) - utils.get_logger().debug("SourceProcess : after connecting to stream") + self.logger.debug("SourceProcess : after connecting to stream") stream.muststop = lambda: self.stop_event.is_set() session = self.session_maker() try: for tweet in stream: - utils.get_logger().debug("tweet " + repr(tweet)) + self.logger.debug("tweet " + repr(tweet)) source = TweetSource(original_json=tweet) - utils.get_logger().debug("source created") + self.logger.debug("source created") add_retries = 0 while add_retries < 10: try: @@ -150,18 +160,18 @@ break except OperationalError as e: session.rollback() - utils.get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries)) + self.logger.debug("Operational Error %s nb %d" % (repr(e), add_retries)) if add_retries == 10: raise e source_id = source.id - utils.get_logger().debug("before queue + source id " + repr(source_id)) - utils.get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) + self.logger.debug("before queue + source id " + repr(source_id)) + self.logger.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) session.commit() self.queue.put((source_id, tweet), False) except Exception as e: - utils.get_logger().error("Error when processing tweet " + repr(e)) + self.logger.error("Error when processing tweet " + repr(e)) finally: session.rollback() stream.close() @@ -170,19 +180,23 @@ self.stop_event.set() -def process_tweet(tweet, source_id, session, access_token): +def process_tweet(tweet, source_id, session, access_token, logger): try: tweet_obj = anyjson.deserialize(tweet) + if 'text' not in tweet_obj: + tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) + session.add(tweet_log) + return screen_name = "" if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: screen_name = tweet_obj['user']['screen_name'] - utils.get_logger().debug(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) - utils.get_logger().debug(u"Process_tweet :" + repr(tweet)) + logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) + logger.debug(u"Process_tweet :" + repr(tweet)) processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None) processor.process() except Exception as e: message = u"Error %s processing tweet %s" % (repr(e), tweet) - utils.get_logger().error(message) + logger.error(message) output = StringIO.StringIO() traceback.print_exception(Exception, e, None, None, output) error_stack = output.getvalue() @@ -196,28 +210,29 @@ class TweetProcess(Process): - def __init__(self, session_maker, queue, options, access_token, stop_event): + def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue): self.session_maker = session_maker self.queue = queue self.stop_event = stop_event self.options = options self.access_token = access_token + self.logger_queue = logger_queue super(TweetProcess, self).__init__() def run(self): - set_logging(self.options) + self.logger = set_logging_process(self.options, self.logger_queue) session = self.session_maker() try: while not self.stop_event.is_set(): try: source_id, tweet_txt = queue.get(True, 3) - utils.get_logger().debug("Processing source id " + repr(source_id)) + self.logger.debug("Processing source id " + repr(source_id)) except Exception as e: - utils.get_logger().debug('Process tweet exception in loop : ' + repr(e)) + self.logger.debug('Process tweet exception in loop : ' + repr(e)) continue - process_tweet(tweet_txt, source_id, session, self.access_token) + process_tweet(tweet_txt, source_id, session, self.access_token, self.logger) session.commit() finally: session.rollback() @@ -231,19 +246,28 @@ return Session, engine, metadata -def process_leftovers(session, access_token): +def process_leftovers(session, access_token, logger): sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) for src in sources: tweet_txt = src.original_json - process_tweet(tweet_txt, src.id, session, access_token) + process_tweet(tweet_txt, src.id, session, access_token, logger) session.commit() #get tweet source that do not match any message #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull; +def process_log(logger_queues, stop_event): + while not stop_event.is_set(): + for lqueue in logger_queues: + try: + record = lqueue.get_nowait() + logging.getLogger(record.name).handle(record) + except Queue.Empty: + continue + time.sleep(0.1) def get_options(): @@ -275,7 +299,7 @@ if __name__ == '__main__': - + (options, args) = get_options() set_logging(options) @@ -287,10 +311,10 @@ conn_str = options.conn_str.strip() if not re.match("^\w+://.+", conn_str): - conn_str = 'sqlite://' + options.conn_str + conn_str = 'sqlite:///' + options.conn_str if conn_str.startswith("sqlite") and options.new: - filepath = conn_str[conn_str.find("://"):] + filepath = conn_str[conn_str.find(":///")+4:] if os.path.exists(filepath): i = 1 basename, extension = os.path.splitext(filepath) @@ -320,7 +344,7 @@ session = Session() try: - process_leftovers(session, access_token) + process_leftovers(session, access_token, utils.get_logger()) session.commit() finally: session.rollback() @@ -330,7 +354,7 @@ utils.get_logger().debug("Leftovers processed. Exiting.") sys.exit() - queue = JoinableQueue() + queue = mQueue() stop_event = Event() #workaround for bug on using urllib2 and multiprocessing @@ -344,15 +368,24 @@ finally: if conn is not None: conn.close() - + + process_engines = [] + logger_queues = [] - sprocess = SourceProcess(Session, queue, options, access_token, stop_event) + SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) + process_engines.append(engine_process) + lqueue = mQueue(1) + logger_queues.append(lqueue) + sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue) tweet_processes = [] for i in range(options.process_nb - 1): - Session, engine, metadata = get_sessionmaker(conn_str) - cprocess = TweetProcess(Session, queue, options, access_token, stop_event) + SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) + process_engines.append(engine_process) + lqueue = mQueue(1) + logger_queues.append(lqueue) + cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue) tweet_processes.append(cprocess) def interupt_handler(signum, frame): @@ -360,23 +393,28 @@ signal.signal(signal.SIGINT, interupt_handler) + log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues,stop_event,)) + log_thread.daemon = True + sprocess.start() for cprocess in tweet_processes: cprocess.start() + log_thread.start() + if options.duration >= 0: - end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) + end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) + while not stop_event.is_set(): if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: stop_event.set() break - if sprocess.is_alive(): + if sprocess.is_alive(): time.sleep(1) else: stop_event.set() break - utils.get_logger().debug("Joining Source Process") try: sprocess.join(10) @@ -384,8 +422,6 @@ utils.get_logger().debug("Pb joining Source Process - terminating") sprocess.terminate() - utils.get_logger().debug("Joining Queue") - #queue.join() for i, cprocess in enumerate(tweet_processes): utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) try: @@ -393,15 +429,30 @@ except: utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) cprocess.terminate() + - utils.get_logger().debug("Processing leftovers") - session = Session() + utils.get_logger().debug("Close queues") try: - process_leftovers(session, access_token) - session.commit() - finally: - session.rollback() - session.close() + queue.close() + for lqueue in logger_queues: + lqueue.close() + except exception as e: + utils.get_logger().error("error when closing queues %s", repr(e)) + #do nothing + + + if options.process_nb > 1: + utils.get_logger().debug("Processing leftovers") + session = Session() + try: + process_leftovers(session, access_token, utils.get_logger()) + session.commit() + finally: + session.rollback() + session.close() + for pengine in process_engines: + pengine.dispose() + utils.get_logger().debug("Done. Exiting.")