# HG changeset patch # User Samuel Huron # Date 1314538948 -7200 # Node ID c7fd6a0b5b516f8ebec1c2f3785191e54f77ba7c # Parent bc17d1af15ab2c6e06ff8c487c721742a5905c11# Parent 6671e9a4c9c5a36d769f0b09c1e6bcc229c473d0 merge diff -r bc17d1af15ab -r c7fd6a0b5b51 script/lib/iri_tweet/models.py --- a/script/lib/iri_tweet/models.py Sun Aug 28 11:48:30 2011 +0200 +++ b/script/lib/iri_tweet/models.py Sun Aug 28 15:42:28 2011 +0200 @@ -1,5 +1,5 @@ -from sqlalchemy import (Boolean, Column, BigInteger, Integer, String, ForeignKey, - DateTime, create_engine) +from sqlalchemy import (Boolean, Column, Enum, BigInteger, Integer, String, + ForeignKey, DateTime, create_engine) from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship import anyjson @@ -27,12 +27,34 @@ else: return anyjson.serialize(obj) +class TweetMeta(type(Base)): + + def __init__(cls, name, bases, ns): #@NoSelf + def init(self, **kwargs): + for key, value in kwargs.items(): + if hasattr(self, key): + setattr(self, key, value) + super(cls, self).__init__() + setattr(cls, '__init__', init) + super(TweetMeta, cls).__init__(name, bases, ns) + + +class ProcessEvent(Base): + __metaclass__ = TweetMeta + __tablename__ = "tweet_process_event" + id = Column(Integer, primary_key=True, autoincrement=True) + ts = Column(DateTime, default=datetime.datetime.utcnow, index=True) + type = Column(Enum("start","pid","shutdown","error", "start_worker", "stop_worker", name="process_event_type_enum"), nullable=False) + args = Column(String) + class EntityType(Base): + __metaclass__ = TweetMeta __tablename__ = "tweet_entity_type" id = Column(Integer, primary_key=True, autoincrement=True) label = Column(String) class Entity(Base): + __metaclass__ = TweetMeta __tablename__ = "tweet_entity" id = Column(Integer, primary_key=True) tweet_id = Column(BigInteger, ForeignKey('tweet_tweet.id')) @@ -44,33 +66,27 @@ source = Column(String) __mapper_args__ = {'polymorphic_on': type, 'polymorphic_identity': 'entity_entity', 'with_polymorphic':'*'} - def __init__(self, **kwargs): - for key, value in kwargs.items(): - if hasattr(self, key): - setattr(self, key, value) class TweetSource(Base): + __metaclass__ = TweetMeta __tablename__ = 'tweet_tweet_source' id = Column(Integer, primary_key=True, autoincrement=True) original_json = Column(String) - received_at = Column(DateTime, default=datetime.datetime.now(), index=True) - - def __init__(self, **kwargs): - for key, value in kwargs.items(): - if hasattr(self, key): - setattr(self, key, value) + received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True) class TweetLog(Base): - + TWEET_STATUS = { 'OK' : 1, 'ERROR' : 2, 'NOT_TWEET': 3, } + __metaclass__ = TweetMeta __tablename__ = 'tweet_tweet_log' id = Column(Integer, primary_key=True, autoincrement=True) + ts = Column(DateTime, default=datetime.datetime.utcnow, index=True) tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id')) tweet_source = relationship("TweetSource", backref="logs") status = Column(Integer) @@ -79,6 +95,7 @@ class Tweet(Base): + __metaclass__ = TweetMeta __tablename__ = 'tweet_tweet' id = Column(BigInteger, primary_key=True, autoincrement=False) @@ -104,32 +121,31 @@ tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id')) tweet_source = relationship("TweetSource", backref="tweet") entity_list = relationship(Entity, backref='tweet') - received_at = Column(DateTime, default=datetime.datetime.now(), index=True) + received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True) - def __init__(self, **kwargs): - for key, value in kwargs.items(): - if hasattr(self, key): - setattr(self, key, value) class UserMessage(Base): + __metaclass__ = TweetMeta __tablename__ = "tweet_user_message" id = Column(Integer, primary_key=True) user_id = Column(Integer, ForeignKey('tweet_user.id')) user = relationship("User", backref="messages") - created_at = Column(DateTime, default=datetime.datetime.now()) + created_at = Column(DateTime, default=datetime.datetime.utcnow) message_id = Column(Integer, ForeignKey('tweet_message.id')) class Message(Base): + __metaclass__ = TweetMeta __tablename__ = "tweet_message" id = Column(Integer, primary_key=True) - created_at = Column(DateTime, default=datetime.datetime.now()) + created_at = Column(DateTime, default=datetime.datetime.utcnow) text = Column(String) users = relationship(UserMessage, backref='message') class User(Base): + __metaclass__ = TweetMeta __tablename__ = "tweet_user" id = Column(BigInteger, primary_key=True, autoincrement=False) @@ -153,10 +169,12 @@ profile_background_image_url = Column(String) profile_background_tile = Column(Boolean) profile_image_url = Column(String) + profile_image_url_https = Column(String) profile_link_color = Column(String) profile_sidebar_border_color = Column(String) profile_sidebar_fill_color = Column(String) profile_text_color = Column(String) + default_profile_image = Column(String) profile_use_background_image = Column(Boolean) protected = Column(Boolean) screen_name = Column(String, index=True) @@ -166,43 +184,33 @@ url = Column(String) utc_offset = Column(Integer) verified = Column(Boolean) - - def __init__(self, **kwargs): - for key, value in kwargs.items(): - if hasattr(self, key): - setattr(self, key, value) class Hashtag(Base): + __metaclass__ = TweetMeta __tablename__ = "tweet_hashtag" id = Column(Integer, primary_key=True) text = Column(String, unique=True, index=True) - def __init__(self, **kwargs): - for key, value in kwargs.items(): - if hasattr(self, key): - setattr(self, key, value) + class Url(Base): + __metaclass__ = TweetMeta __tablename__ = "tweet_url" id = Column(Integer, primary_key=True) url = Column(String, unique=True) expanded_url = Column(String) - def __init__(self, **kwargs): - for key, value in kwargs.items(): - if hasattr(self, key): - setattr(self, key, value) + class MediaType(Base): + __metaclass__ = TweetMeta __tablename__ = "tweet_media_type" id = Column(Integer, primary_key=True, autoincrement=True) label = Column(String, unique=True, index=True) - def __init__(self, **kwargs): - for key, value in kwargs.items(): - if hasattr(self, key): - setattr(self, key, value) + class Media(Base): + __metaclass__ = TweetMeta __tablename__ = "tweet_media" id = Column(BigInteger, primary_key=True, autoincrement=False) id_str = Column(String, unique=True) @@ -214,10 +222,6 @@ sizes = Column(String) type_id = Column(Integer, ForeignKey("tweet_media_type.id")) type = relationship(MediaType, primaryjoin=type_id == MediaType.id) - def __init__(self, **kwargs): - for key, value in kwargs.items(): - if hasattr(self, key): - setattr(self, key, value) @@ -227,11 +231,6 @@ id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True) hashtag_id = Column(Integer, ForeignKey("tweet_hashtag.id")) hashtag = relationship(Hashtag, primaryjoin=hashtag_id == Hashtag.id) - def __init__(self, **kwargs): - super(EntityHashtag, self).__init__(**kwargs) - for key, value in kwargs.items(): - if hasattr(self, key): - setattr(self, key, value) class EntityUrl(Entity): @@ -240,11 +239,6 @@ id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True) url_id = Column(Integer, ForeignKey("tweet_url.id")) url = relationship(Url, primaryjoin=url_id == Url.id) - def __init__(self, **kwargs): - super(EntityUrl, self).__init__(**kwargs) - for key, value in kwargs.items(): - if hasattr(self, key): - setattr(self, key, value) class EntityUser(Entity): __tablename__ = "tweet_entity_user" @@ -253,11 +247,6 @@ user_id = Column(BigInteger, ForeignKey('tweet_user.id')) user = relationship(User, primaryjoin=(user_id == User.id)) - def __init__(self, **kwargs): - super(EntityUser, self).__init__(**kwargs) - for key, value in kwargs.items(): - if hasattr(self, key): - setattr(self, key, value) class EntityMedia(Entity): __tablename__ = "tweet_entity_media" @@ -266,12 +255,6 @@ media_id = Column(BigInteger, ForeignKey('tweet_media.id')) media = relationship(Media, primaryjoin=(media_id == Media.id)) - def __init__(self, **kwargs): - super(EntityMedia, self).__init__(**kwargs) - for key, value in kwargs.items(): - if hasattr(self, key): - setattr(self, key, value) - def setup_database(*args, **kwargs): @@ -288,118 +271,3 @@ return (engine, metadata) -rest_tweet_tweet = { - u'iso_language_code': 'unicode', - u'text': 'unicode', - u'from_user_id_str': 'unicode', - u'profile_image_url': 'unicode', - u'to_user_id_str': 'NoneType', - u'created_at': 'unicode', - u'source': 'unicode', - u'to_user': 'unicode', - u'id_str': 'unicode', - u'from_user': 'unicode', - u'place': {u'type': 'unicode', u'id': 'unicode', u'full_name': 'unicode'}, - u'from_user_id': 'int', - u'to_user_id': 'NoneType', - u'geo': 'NoneType', - u'id': 'int', - u'metadata': {u'result_type': 'unicode'} -} - -tweet_tweet = { - 'contributors': None, - 'coordinates': None, - 'created_at': 'date', - 'entities': "tweet_entity", - 'favorited': "bool", - 'geo': None, - 'id': "long", - 'id_str': "string", - 'in_reply_to_screen_name': "string", - 'in_reply_to_status_id': "long", - 'in_reply_to_status_id_str': "string", - 'in_reply_to_user_id': "int", - 'in_reply_to_user_id_str': "string", - 'place': "string", - 'retweet_count': "int", - 'retweeted': "bool", - 'source': "string", - 'text': "string", - 'truncated': "bool", - 'user': "tweet_user" -} -tweet_user = { - 'contributors_enabled': 'bool', - 'created_at': 'str', - 'description': 'str', - 'favourites_count': 'int', - 'follow_request_sent': None, - 'followers_count': 'int', - 'following': None, - 'friends_count': 'int', - 'geo_enabled': 'bool', - 'id': 'int', - 'id_str': 'str', - 'is_translator': 'bool', - 'lang': 'str', - 'listed_count': 'int', - 'location': 'str', - 'name': 'str', - 'notifications': 'NoneType', - 'profile_background_color': 'str', - 'profile_background_image_url': 'str', - 'profile_background_tile': 'bool', - 'profile_image_url': 'str', - 'profile_link_color': 'str', - 'profile_sidebar_border_color': 'str', - 'profile_sidebar_fill_color': 'str', - 'profile_text_color': 'str', - 'profile_use_background_image': 'bool', - 'protected': 'bool', - 'screen_name': 'str', - 'show_all_inline_media': 'bool', - 'statuses_count': 'int', - 'time_zone': 'str', - 'url': 'str', - 'utc_offset': 'int', - 'verified': 'bool', -} - - -tweet_entity_hashtag = { - 'hashtag' : 'tweet_hashtag', - 'indice_start' : 'int', - 'indice_end' : 'int', - 'tweet':'tweet_tweet' -} - -tweet_entity_url = { - 'url' : 'tweet_url', - 'indice_start' : 'int', - 'indice_end' : 'int', - 'tweet':'tweet_tweet' -} - -tweet_entity_user = { - 'user' : 'tweet_user', - 'indice_start' : 'int', - 'indice_end' : 'int', - 'tweet':'tweet_tweet' -} - -#id int -#id_str str -#indices list -#name str -#screen_name str - -tweet_hashtag = { - "text": "string" -} - -tweet_url = { - "url": "string", - "expanded_url" : "string", -} - diff -r bc17d1af15ab -r c7fd6a0b5b51 script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Sun Aug 28 11:48:30 2011 +0200 +++ b/script/stream/recorder_tweetstream.py Sun Aug 28 15:42:28 2011 +0200 @@ -1,6 +1,6 @@ from getpass import getpass from iri_tweet import models, utils -from iri_tweet.models import TweetSource, TweetLog +from iri_tweet.models import TweetSource, TweetLog, ProcessEvent from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, get_logger) from optparse import OptionParser @@ -10,6 +10,7 @@ import StringIO import anyjson import datetime +import inspect import logging import os import re @@ -110,24 +111,68 @@ # Don't listen to auth error, since we can't reasonably reconnect # when we get one. +def add_process_event(type, args, session_maker): + session = session_maker() + try: + evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type) + session.add(evt) + session.commit() + finally: + session.close() + + +class BaseProcess(Process): + + def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): + self.parent_pid = parent_pid + self.session_maker = session_maker + self.queue = queue + self.options = options + self.logger_queue = logger_queue + self.stop_event = stop_event + self.access_token = access_token + + super(BaseProcess, self).__init__() + + # + # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids + # + def parent_is_alive(self): + try: + # try to call Parent + os.kill(self.parent_pid, 0) + except OSError: + # *beeep* oh no! The phone's disconnected! + return False + else: + # *ring* Hi mom! + return True + + + def __get_process_event_args(self): + return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token} + + def run(self): + try: + add_process_event("start_worker", self.__get_process_event_args(), self.session_maker) + self.do_run() + finally: + add_process_event("stop_worker",self.__get_process_event_args(), self.session_maker) + + def do_run(self): + raise NotImplementedError() -class SourceProcess(Process): +class SourceProcess(BaseProcess): - def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue): - self.session_maker = session_maker - self.queue = queue + def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): self.track = options.track self.reconnects = options.reconnects self.token_filename = options.token_filename - self.stop_event = stop_event - self.options = options - self.access_token = access_token - self.logger_queue = logger_queue - super(SourceProcess, self).__init__() - - def run(self): + super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) + + def do_run(self): #import pydevd #pydevd.settrace(suspend=False) @@ -148,9 +193,11 @@ try: for tweet in stream: - self.logger.debug("tweet " + repr(tweet)) + if not self.parent_is_alive(): + sys.exit() + self.logger.debug("SourceProcess : tweet " + repr(tweet)) source = TweetSource(original_json=tweet) - self.logger.debug("source created") + self.logger.debug("SourceProcess : source created") add_retries = 0 while add_retries < 10: try: @@ -160,18 +207,18 @@ break except OperationalError as e: session.rollback() - self.logger.debug("Operational Error %s nb %d" % (repr(e), add_retries)) + self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries)) if add_retries == 10: raise e source_id = source.id - self.logger.debug("before queue + source id " + repr(source_id)) - self.logger.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) + self.logger.debug("SourceProcess : before queue + source id " + repr(source_id)) + self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) session.commit() self.queue.put((source_id, tweet), False) except Exception as e: - self.logger.error("Error when processing tweet " + repr(e)) + self.logger.error("SourceProcess : Error when processing tweet " + repr(e)) finally: session.rollback() stream.close() @@ -196,11 +243,13 @@ processor.process() except Exception as e: message = u"Error %s processing tweet %s" % (repr(e), tweet) - logger.error(message) + logger.exception(message) output = StringIO.StringIO() - traceback.print_exception(Exception, e, None, None, output) - error_stack = output.getvalue() - output.close() + try: + traceback.print_exc(file=output) + error_stack = output.getvalue() + finally: + output.close() session.rollback() tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack) session.add(tweet_log) @@ -208,26 +257,20 @@ -class TweetProcess(Process): +class TweetProcess(BaseProcess): - def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue): - self.session_maker = session_maker - self.queue = queue - self.stop_event = stop_event - self.options = options - self.access_token = access_token - self.logger_queue = logger_queue - super(TweetProcess, self).__init__() + def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): + super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) - def run(self): + def do_run(self): self.logger = set_logging_process(self.options, self.logger_queue) session = self.session_maker() try: - while not self.stop_event.is_set(): + while not self.stop_event.is_set() and self.parent_is_alive(): try: - source_id, tweet_txt = queue.get(True, 3) + source_id, tweet_txt = self.queue.get(True, 3) self.logger.debug("Processing source id " + repr(source_id)) except Exception as e: self.logger.debug('Process tweet exception in loop : ' + repr(e)) @@ -273,7 +316,11 @@ def get_options(): - parser = OptionParser() + + usage = "usage: %prog [options]" + + parser = OptionParser(usage=usage) + parser.add_option("-f", "--file", dest="conn_str", help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db") parser.add_option("-u", "--user", dest="username", @@ -293,58 +340,20 @@ parser.add_option("-N", "--nb-process", dest="process_nb", help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') - - utils.set_logging_options(parser) return parser.parse_args() - -if __name__ == '__main__': - (options, args) = get_options() - - set_logging(options) - - if options.debug: - print "OPTIONS : " - print repr(options) - - - conn_str = options.conn_str.strip() - if not re.match("^\w+://.+", conn_str): - conn_str = 'sqlite:///' + options.conn_str - - if conn_str.startswith("sqlite") and options.new: - filepath = conn_str[conn_str.find(":///")+4:] - if os.path.exists(filepath): - i = 1 - basename, extension = os.path.splitext(filepath) - new_path = '%s.%d%s' % (basename, i, extension) - while i < 1000000 and os.path.exists(new_path): - i += 1 - new_path = '%s.%d%s' % (basename, i, extension) - if i >= 1000000: - raise Exception("Unable to find new filename for " + filepath) - else: - shutil.move(filepath, new_path) +def do_run(options, session_maker): - Session, engine, metadata = get_sessionmaker(conn_str) - - if options.new: - check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True) - if len(check_metadata.sorted_tables) > 0: - message = "Database %s not empty exiting" % conn_str - utils.get_logger().error(message) - sys.exit(message) - - metadata.create_all(engine) - + stop_args = {} + access_token = None if not options.username or not options.password: access_token = utils.get_oauth_token(options.token_filename) - session = Session() + session = session_maker() try: process_leftovers(session, access_token, utils.get_logger()) session.commit() @@ -354,7 +363,7 @@ if options.process_nb <= 0: utils.get_logger().debug("Leftovers processed. Exiting.") - sys.exit() + return None queue = mQueue() stop_event = Event() @@ -365,7 +374,7 @@ try: conn = urllib2.urlopen(req) except: - pass + utils.get_logger().debug("could not open localhost") #donothing finally: if conn is not None: @@ -378,7 +387,8 @@ process_engines.append(engine_process) lqueue = mQueue(1) logger_queues.append(lqueue) - sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue) + pid = os.getpid() + sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) tweet_processes = [] @@ -387,22 +397,29 @@ process_engines.append(engine_process) lqueue = mQueue(1) logger_queues.append(lqueue) - cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue) + cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) tweet_processes.append(cprocess) def interupt_handler(signum, frame): + utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) + stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}) stop_event.set() - signal.signal(signal.SIGINT, interupt_handler) + signal.signal(signal.SIGINT , interupt_handler) + signal.signal(signal.SIGHUP , interupt_handler) + signal.signal(signal.SIGALRM, interupt_handler) + signal.signal(signal.SIGTERM, interupt_handler) - log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues,stop_event,)) + log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) log_thread.daemon = True + log_thread.start() + sprocess.start() for cprocess in tweet_processes: cprocess.start() - log_thread.start() + add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name,sprocess.pid), 'consumers':dict([(p.name,p.pid) for p in tweet_processes])}, session_maker) if options.duration >= 0: end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) @@ -410,11 +427,13 @@ while not stop_event.is_set(): if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: + stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts}) stop_event.set() break if sprocess.is_alive(): time.sleep(1) else: + stop_args.update({'message': 'Source process killed'}) stop_event.set() break utils.get_logger().debug("Joining Source Process") @@ -445,7 +464,7 @@ if options.process_nb > 1: utils.get_logger().debug("Processing leftovers") - session = Session() + session = session_maker() try: process_leftovers(session, access_token, utils.get_logger()) session.commit() @@ -455,6 +474,61 @@ for pengine in process_engines: pengine.dispose() + + return stop_args + + +if __name__ == '__main__': + + (options, args) = get_options() + + set_logging(options) + + utils.get_logger().debug("OPTIONS : " + repr(options)) + + conn_str = options.conn_str.strip() + if not re.match("^\w+://.+", conn_str): + conn_str = 'sqlite:///' + options.conn_str + if conn_str.startswith("sqlite") and options.new: + filepath = conn_str[conn_str.find(":///") + 4:] + if os.path.exists(filepath): + i = 1 + basename, extension = os.path.splitext(filepath) + new_path = '%s.%d%s' % (basename, i, extension) + while i < 1000000 and os.path.exists(new_path): + i += 1 + new_path = '%s.%d%s' % (basename, i, extension) + if i >= 1000000: + raise Exception("Unable to find new filename for " + filepath) + else: + shutil.move(filepath, new_path) + + Session, engine, metadata = get_sessionmaker(conn_str) + + if options.new: + check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True) + if len(check_metadata.sorted_tables) > 0: + message = "Database %s not empty exiting" % conn_str + utils.get_logger().error(message) + sys.exit(message) + + metadata.create_all(engine) + stop_args = {} + try: + add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session) + stop_args = do_run(options, Session) + except Exception as e: + utils.get_logger().exception("Error in main thread") + outfile = StringIO.StringIO() + try: + traceback.print_exc(file=outfile) + stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()} + finally: + outfile.close() + raise + finally: + add_process_event(type="shutdown", args=stop_args, session_maker=Session) + utils.get_logger().debug("Done. Exiting.")