# HG changeset patch # User Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com> # Date 1313598727 -7200 # Node ID 500cd0405c7acfc4679cbcb6bd298f74d5b158fd # Parent 2209e66bb50bc6d937843bc2729774d68b486851 improve multi processing architecture diff -r 2209e66bb50b -r 500cd0405c7a script/lib/iri_tweet/export_twitter_alchemy.py --- a/script/lib/iri_tweet/export_twitter_alchemy.py Fri Aug 12 18:17:27 2011 +0200 +++ b/script/lib/iri_tweet/export_twitter_alchemy.py Wed Aug 17 18:32:07 2011 +0200 @@ -5,7 +5,7 @@ from optparse import OptionParser #@UnresolvedImport from sqlalchemy import Table, Column, BigInteger, MetaData from sqlalchemy.orm import sessionmaker -from utils import parse_date, set_logging_options, set_logging, get_filter_query, logger +from utils import parse_date, set_logging_options, set_logging, get_filter_query, get_logger from models import setup_database import datetime import os.path @@ -100,13 +100,17 @@ set_logging(options) - logger.debug("OPTIONS : " + repr(options)) #@UndefinedVariable + get_logger().debug("OPTIONS : " + repr(options)) #@UndefinedVariable if len(sys.argv) == 1 or options.database is None: parser.print_help() sys.exit(1) - engine, metadata = setup_database('sqlite:///'+options.database, echo=((options.verbose-options.quiet)>0), create_all = False) + conn_str = options.database.strip() + if not re.match("^\w+://.+", conn_str): + conn_str = 'sqlite:///' + conn_str + + engine, metadata = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False) Session = sessionmaker() conn = engine.connect() @@ -158,7 +162,7 @@ for params in parameters: - logger.debug("PARAMETERS " + repr(params)) #@UndefinedVariable + get_logger().debug("PARAMETERS " + repr(params)) #@UndefinedVariable start_date_str = params.get("start_date",None) end_date_str = params.get("end_date", None) @@ -191,12 +195,12 @@ if content_file and content_file.find("http") == 0: - logger.debug("url : " + content_file) #@UndefinedVariable + get_logger().debug("url : " + content_file) #@UndefinedVariable h = httplib2.Http() resp, content = h.request(content_file) - logger.debug("url response " + repr(resp) + " content " + repr(content)) #@UndefinedVariable + get_logger().debug("url response " + repr(resp) + " content " + repr(content)) #@UndefinedVariable project = anyjson.deserialize(content) root = etree.fromstring(project["ldt"]) @@ -253,7 +257,7 @@ if ensemble_parent is None: - logger.error("Can not process file") #@UndefinedVariable + get_logger().error("Can not process file") #@UndefinedVariable sys.exit() if options.replace: @@ -308,18 +312,18 @@ project["ldt"] = output_data body = anyjson.serialize(project) - logger.debug("write http " + content_file) #@UndefinedVariable - logger.debug("write http " + repr(body)) #@UndefinedVariable + get_logger().debug("write http " + content_file) #@UndefinedVariable + get_logger().debug("write http " + repr(body)) #@UndefinedVariable h = httplib2.Http() resp, content = h.request(content_file, "PUT", headers={'content-type':'application/json'}, body=body) - logger.debug("write http " + repr(resp) + " content " + content) #@UndefinedVariable + get_logger().debug("write http " + repr(resp) + " content " + content) #@UndefinedVariable else: if content_file and os.path.exists(content_file): dest_file_name = content_file else: dest_file_name = options.filename - logger.debug("WRITE : " + dest_file_name) #@UndefinedVariable + get_logger().debug("WRITE : " + dest_file_name) #@UndefinedVariable output = open(dest_file_name, "w") output.write(output_data) output.flush() diff -r 2209e66bb50b -r 500cd0405c7a script/lib/iri_tweet/models.py --- a/script/lib/iri_tweet/models.py Fri Aug 12 18:17:27 2011 +0200 +++ b/script/lib/iri_tweet/models.py Wed Aug 17 18:32:07 2011 +0200 @@ -66,6 +66,7 @@ TWEET_STATUS = { 'OK' : 1, 'ERROR' : 2, + 'NOT_TWEET': 3, } __tablename__ = 'tweet_tweet_log' @@ -158,7 +159,7 @@ profile_text_color = Column(String) profile_use_background_image = Column(Boolean) protected = Column(Boolean) - screen_name = Column(String, index=True, unique=True) + screen_name = Column(String, index=True) show_all_inline_media = Column(Boolean) statuses_count = Column(Integer) time_zone = Column(String) diff -r 2209e66bb50b -r 500cd0405c7a script/lib/iri_tweet/tweet_twitter_user.py --- a/script/lib/iri_tweet/tweet_twitter_user.py Fri Aug 12 18:17:27 2011 +0200 +++ b/script/lib/iri_tweet/tweet_twitter_user.py Wed Aug 17 18:32:07 2011 +0200 @@ -1,6 +1,6 @@ from iri_tweet.models import setup_database, Message, UserMessage, User from iri_tweet.utils import (get_oauth_token, get_user_query, set_logging_options, - set_logging, parse_date, logger) + set_logging, parse_date, get_logger) from optparse import OptionParser #@UnresolvedImport from sqlalchemy import BigInteger from sqlalchemy.orm import sessionmaker @@ -10,6 +10,7 @@ import sys import time import twitter +import re APPLICATION_NAME = "Tweet recorder user" CONSUMER_KEY = "Vdr5ZcsjI1G3esTPI8yDg" @@ -58,7 +59,11 @@ if not options.message or len(options.message) == 0: sys.exit() - engine, metadata = setup_database('sqlite:///'+options.database, echo=((options.verbose-options.quiet)>0), create_all = True) + conn_str = options.database.strip() + if not re.match("^\w+://.+", conn_str): + conn_str = 'sqlite:///' + conn_str + + engine, metadata = setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all = False) Session = sessionmaker() conn = engine.connect() @@ -107,7 +112,7 @@ screen_name = user.screen_name message = u"@%s: %s" % (screen_name, base_message) - logger.debug("new status : " + message) #@UndefinedVariable + get_logger.debug("new status : " + message) #@UndefinedVariable if not options.simulate: t.statuses.update(status=message) user_message = UserMessage(user_id=user.id, message_id=message_obj.id) diff -r 2209e66bb50b -r 500cd0405c7a script/lib/iri_tweet/utils.py --- a/script/lib/iri_tweet/utils.py Fri Aug 12 18:17:27 2011 +0200 +++ b/script/lib/iri_tweet/utils.py Wed Aug 17 18:32:07 2011 +0200 @@ -3,10 +3,11 @@ ACCESS_TOKEN_SECRET, adapt_date, adapt_json, TweetSource, TweetLog, MediaType, Media, EntityMedia, Entity, EntityType) from sqlalchemy.sql import select, or_ #@UnresolvedImport +import Queue #@UnresolvedImport import anyjson #@UnresolvedImport import datetime import email.utils -import logging #@UnresolvedImport +import logging import os.path import sys import twitter.oauth #@UnresolvedImport @@ -14,7 +15,6 @@ import twitter_text #@UnresolvedImport - CACHE_ACCESS_TOKEN = {} def get_oauth_token(token_file_path=None, check_access_token=True, application_name=APPLICATION_NAME, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET): @@ -448,7 +448,7 @@ self.obj_buffer.persists(self.session) -def set_logging(options, plogger=None): +def set_logging(options, plogger=None, queue=None): logging_config = { "format" : '%(asctime)s %(levelname)s:%(name)s:%(message)s', @@ -466,14 +466,17 @@ if logger is None: logger = get_logger() #@UndefinedVariable - if len(logger.handlers) == 0: + if len(logger.handlers) == 0: filename = logging_config.get("filename") - if filename: + if queue is not None: + hdlr = QueueHandler(queue, True) + elif filename: mode = logging_config.get("filemode", 'a') hdlr = logging.FileHandler(filename, mode) #@UndefinedVariable else: stream = logging_config.get("stream") hdlr = logging.StreamHandler(stream) #@UndefinedVariable + fs = logging_config.get("format", logging.BASIC_FORMAT) #@UndefinedVariable dfs = logging_config.get("datefmt", None) fmt = logging.Formatter(fs, dfs) #@UndefinedVariable @@ -484,6 +487,7 @@ logger.setLevel(level) options.debug = (options.verbose-options.quiet > 0) + return logger def set_logging_options(parser): parser.add_option("-l", "--log", dest="logfile", @@ -536,5 +540,47 @@ return query.distinct() +logger_name = "iri.tweet" + def get_logger(): - return logging.getLogger("iri_tweet") #@UndefinedVariable + global logger_name + return logging.getLogger(logger_name) #@UndefinedVariable + + +# Next two import lines for this demo only + +class QueueHandler(logging.Handler): #@UndefinedVariable + """ + This is a logging handler which sends events to a multiprocessing queue. + """ + + def __init__(self, queue, ignore_full): + """ + Initialise an instance, using the passed queue. + """ + logging.Handler.__init__(self) #@UndefinedVariable + self.queue = queue + self.ignore_full = True + + def emit(self, record): + """ + Emit a record. + + Writes the LogRecord to the queue. + """ + try: + ei = record.exc_info + if ei: + dummy = self.format(record) # just to get traceback text into record.exc_text + record.exc_info = None # not needed any more + if not self.ignore_full or not self.queue.full(): + self.queue.put_nowait(record) + except Queue.Full: + if self.ignore_full: + pass + else: + raise + except (KeyboardInterrupt, SystemExit): + raise + except: + self.handleError(record) diff -r 2209e66bb50b -r 500cd0405c7a script/rest/search_twitter.py --- a/script/rest/search_twitter.py Fri Aug 12 18:17:27 2011 +0200 +++ b/script/rest/search_twitter.py Wed Aug 17 18:32:07 2011 +0200 @@ -36,7 +36,13 @@ (options, args) = get_option() twitter = twitter.Twitter(domain="search.twitter.com") - engine, metadata = models.setup_database('sqlite:///'+args[0], echo=((options.verbose-options.quiet)>0)) + + conn_str = args[0].strip() + if not re.match("^\w+://.+", conn_str): + conn_str = 'sqlite:///' + conn_str + + + engine, metadata = models.setup_database(conn_str, echo=((options.verbose-options.quiet)>0), create_all=True) Session = sessionmaker(bind=engine) session = Session() try: @@ -54,7 +60,7 @@ print tweet tweet_str = anyjson.serialize(tweet) #invalidate user id - processor = utils.TwitterProcessor(tweet, tweet_str, None, session, options.token_filename) + processor = utils.TwitterProcessor(tweet, tweet_str, None, session, None, options.token_filename) processor.process() session.flush() session.commit() diff -r 2209e66bb50b -r 500cd0405c7a script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Fri Aug 12 18:17:27 2011 +0200 +++ b/script/stream/recorder_tweetstream.py Wed Aug 17 18:32:07 2011 +0200 @@ -1,10 +1,12 @@ from getpass import getpass from iri_tweet import models, utils from iri_tweet.models import TweetSource, TweetLog -from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger +from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, + get_logger) from optparse import OptionParser from sqlalchemy.exc import OperationalError from sqlalchemy.orm import scoped_session, sessionmaker +import Queue import StringIO import anyjson import datetime @@ -16,6 +18,7 @@ import socket import sqlalchemy.schema import sys +import threading import time import traceback import tweepy.auth @@ -34,7 +37,7 @@ def set_logging(options): - utils.set_logging(options, logging.getLogger('iri_tweet')) + utils.set_logging(options, logging.getLogger('iri.tweet')) utils.set_logging(options, logging.getLogger('multiprocessing')) if options.debug >= 2: utils.set_logging(options, logging.getLogger('sqlalchemy.engine')) @@ -42,6 +45,11 @@ #utils.set_logging(options, logging.getLogger('sqlalchemy.pool')) #utils.set_logging(options, logging.getLogger('sqlalchemy.orm')) +def set_logging_process(options, queue): + qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue) + qlogger.propagate = 0 + return qlogger + def get_auth(options, access_token): if options.username and options.password: auth = tweepy.auth.BasicAuthHandler(options.username, options.password) @@ -107,7 +115,7 @@ class SourceProcess(Process): - def __init__(self, session_maker, queue, options, access_token, stop_event): + def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue): self.session_maker = session_maker self.queue = queue self.track = options.track @@ -116,31 +124,33 @@ self.stop_event = stop_event self.options = options self.access_token = access_token + self.logger_queue = logger_queue super(SourceProcess, self).__init__() def run(self): + #import pydevd #pydevd.settrace(suspend=False) - set_logging(self.options) + self.logger = set_logging_process(self.options, self.logger_queue) self.auth = get_auth(self.options, self.access_token) - utils.get_logger().debug("SourceProcess : run") + self.logger.debug("SourceProcess : run") track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() track_list = [k for k in track_list.split(',')] - utils.get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list)) + self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True) - utils.get_logger().debug("SourceProcess : after connecting to stream") + self.logger.debug("SourceProcess : after connecting to stream") stream.muststop = lambda: self.stop_event.is_set() session = self.session_maker() try: for tweet in stream: - utils.get_logger().debug("tweet " + repr(tweet)) + self.logger.debug("tweet " + repr(tweet)) source = TweetSource(original_json=tweet) - utils.get_logger().debug("source created") + self.logger.debug("source created") add_retries = 0 while add_retries < 10: try: @@ -150,18 +160,18 @@ break except OperationalError as e: session.rollback() - utils.get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries)) + self.logger.debug("Operational Error %s nb %d" % (repr(e), add_retries)) if add_retries == 10: raise e source_id = source.id - utils.get_logger().debug("before queue + source id " + repr(source_id)) - utils.get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) + self.logger.debug("before queue + source id " + repr(source_id)) + self.logger.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) session.commit() self.queue.put((source_id, tweet), False) except Exception as e: - utils.get_logger().error("Error when processing tweet " + repr(e)) + self.logger.error("Error when processing tweet " + repr(e)) finally: session.rollback() stream.close() @@ -170,19 +180,23 @@ self.stop_event.set() -def process_tweet(tweet, source_id, session, access_token): +def process_tweet(tweet, source_id, session, access_token, logger): try: tweet_obj = anyjson.deserialize(tweet) + if 'text' not in tweet_obj: + tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) + session.add(tweet_log) + return screen_name = "" if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: screen_name = tweet_obj['user']['screen_name'] - utils.get_logger().debug(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) - utils.get_logger().debug(u"Process_tweet :" + repr(tweet)) + logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) + logger.debug(u"Process_tweet :" + repr(tweet)) processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None) processor.process() except Exception as e: message = u"Error %s processing tweet %s" % (repr(e), tweet) - utils.get_logger().error(message) + logger.error(message) output = StringIO.StringIO() traceback.print_exception(Exception, e, None, None, output) error_stack = output.getvalue() @@ -196,28 +210,29 @@ class TweetProcess(Process): - def __init__(self, session_maker, queue, options, access_token, stop_event): + def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue): self.session_maker = session_maker self.queue = queue self.stop_event = stop_event self.options = options self.access_token = access_token + self.logger_queue = logger_queue super(TweetProcess, self).__init__() def run(self): - set_logging(self.options) + self.logger = set_logging_process(self.options, self.logger_queue) session = self.session_maker() try: while not self.stop_event.is_set(): try: source_id, tweet_txt = queue.get(True, 3) - utils.get_logger().debug("Processing source id " + repr(source_id)) + self.logger.debug("Processing source id " + repr(source_id)) except Exception as e: - utils.get_logger().debug('Process tweet exception in loop : ' + repr(e)) + self.logger.debug('Process tweet exception in loop : ' + repr(e)) continue - process_tweet(tweet_txt, source_id, session, self.access_token) + process_tweet(tweet_txt, source_id, session, self.access_token, self.logger) session.commit() finally: session.rollback() @@ -231,19 +246,28 @@ return Session, engine, metadata -def process_leftovers(session, access_token): +def process_leftovers(session, access_token, logger): sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) for src in sources: tweet_txt = src.original_json - process_tweet(tweet_txt, src.id, session, access_token) + process_tweet(tweet_txt, src.id, session, access_token, logger) session.commit() #get tweet source that do not match any message #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull; +def process_log(logger_queues, stop_event): + while not stop_event.is_set(): + for lqueue in logger_queues: + try: + record = lqueue.get_nowait() + logging.getLogger(record.name).handle(record) + except Queue.Empty: + continue + time.sleep(0.1) def get_options(): @@ -275,7 +299,7 @@ if __name__ == '__main__': - + (options, args) = get_options() set_logging(options) @@ -287,10 +311,10 @@ conn_str = options.conn_str.strip() if not re.match("^\w+://.+", conn_str): - conn_str = 'sqlite://' + options.conn_str + conn_str = 'sqlite:///' + options.conn_str if conn_str.startswith("sqlite") and options.new: - filepath = conn_str[conn_str.find("://"):] + filepath = conn_str[conn_str.find(":///")+4:] if os.path.exists(filepath): i = 1 basename, extension = os.path.splitext(filepath) @@ -320,7 +344,7 @@ session = Session() try: - process_leftovers(session, access_token) + process_leftovers(session, access_token, utils.get_logger()) session.commit() finally: session.rollback() @@ -330,7 +354,7 @@ utils.get_logger().debug("Leftovers processed. Exiting.") sys.exit() - queue = JoinableQueue() + queue = mQueue() stop_event = Event() #workaround for bug on using urllib2 and multiprocessing @@ -344,15 +368,24 @@ finally: if conn is not None: conn.close() - + + process_engines = [] + logger_queues = [] - sprocess = SourceProcess(Session, queue, options, access_token, stop_event) + SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) + process_engines.append(engine_process) + lqueue = mQueue(1) + logger_queues.append(lqueue) + sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue) tweet_processes = [] for i in range(options.process_nb - 1): - Session, engine, metadata = get_sessionmaker(conn_str) - cprocess = TweetProcess(Session, queue, options, access_token, stop_event) + SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) + process_engines.append(engine_process) + lqueue = mQueue(1) + logger_queues.append(lqueue) + cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue) tweet_processes.append(cprocess) def interupt_handler(signum, frame): @@ -360,23 +393,28 @@ signal.signal(signal.SIGINT, interupt_handler) + log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues,stop_event,)) + log_thread.daemon = True + sprocess.start() for cprocess in tweet_processes: cprocess.start() + log_thread.start() + if options.duration >= 0: - end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) + end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) + while not stop_event.is_set(): if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: stop_event.set() break - if sprocess.is_alive(): + if sprocess.is_alive(): time.sleep(1) else: stop_event.set() break - utils.get_logger().debug("Joining Source Process") try: sprocess.join(10) @@ -384,8 +422,6 @@ utils.get_logger().debug("Pb joining Source Process - terminating") sprocess.terminate() - utils.get_logger().debug("Joining Queue") - #queue.join() for i, cprocess in enumerate(tweet_processes): utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) try: @@ -393,15 +429,30 @@ except: utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) cprocess.terminate() + - utils.get_logger().debug("Processing leftovers") - session = Session() + utils.get_logger().debug("Close queues") try: - process_leftovers(session, access_token) - session.commit() - finally: - session.rollback() - session.close() + queue.close() + for lqueue in logger_queues: + lqueue.close() + except exception as e: + utils.get_logger().error("error when closing queues %s", repr(e)) + #do nothing + + + if options.process_nb > 1: + utils.get_logger().debug("Processing leftovers") + session = Session() + try: + process_leftovers(session, access_token, utils.get_logger()) + session.commit() + finally: + session.rollback() + session.close() + for pengine in process_engines: + pengine.dispose() + utils.get_logger().debug("Done. Exiting.") diff -r 2209e66bb50b -r 500cd0405c7a script/virtualenv/res/psycopg2-2.4.2.tar.gz Binary file script/virtualenv/res/psycopg2-2.4.2.tar.gz has changed