script/stream/recorder_tweetstream.py
changeset 264 c7fd6a0b5b51
parent 263 6671e9a4c9c5
child 272 fe2efe3600ea
equal deleted inserted replaced
259:bc17d1af15ab 264:c7fd6a0b5b51
     1 from getpass import getpass
     1 from getpass import getpass
     2 from iri_tweet import models, utils
     2 from iri_tweet import models, utils
     3 from iri_tweet.models import TweetSource, TweetLog
     3 from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
     4 from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
     4 from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
     5     get_logger)
     5     get_logger)
     6 from optparse import OptionParser
     6 from optparse import OptionParser
     7 from sqlalchemy.exc import OperationalError
     7 from sqlalchemy.exc import OperationalError
     8 from sqlalchemy.orm import scoped_session, sessionmaker
     8 from sqlalchemy.orm import scoped_session, sessionmaker
     9 import Queue
     9 import Queue
    10 import StringIO
    10 import StringIO
    11 import anyjson
    11 import anyjson
    12 import datetime
    12 import datetime
       
    13 import inspect
    13 import logging
    14 import logging
    14 import os
    15 import os
    15 import re
    16 import re
    16 import shutil
    17 import shutil
    17 import signal
    18 import signal
   108 
   109 
   109                 time.sleep(self.retry_wait)
   110                 time.sleep(self.retry_wait)
   110         # Don't listen to auth error, since we can't reasonably reconnect
   111         # Don't listen to auth error, since we can't reasonably reconnect
   111         # when we get one.
   112         # when we get one.
   112 
   113 
   113 
   114 def add_process_event(type, args, session_maker):
   114 
   115     session = session_maker()
   115 
   116     try:
   116 class SourceProcess(Process):
   117         evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
   117     
   118         session.add(evt)
   118     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
   119         session.commit()
       
   120     finally:
       
   121         session.close()
       
   122 
       
   123 
       
   124 class BaseProcess(Process):
       
   125 
       
   126     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
       
   127         self.parent_pid = parent_pid
   119         self.session_maker = session_maker
   128         self.session_maker = session_maker
   120         self.queue = queue
   129         self.queue = queue
       
   130         self.options = options
       
   131         self.logger_queue = logger_queue
       
   132         self.stop_event = stop_event
       
   133         self.access_token = access_token
       
   134 
       
   135         super(BaseProcess, self).__init__()
       
   136 
       
   137     #
       
   138     # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
       
   139     #
       
   140     def parent_is_alive(self):
       
   141         try:
       
   142             # try to call Parent
       
   143             os.kill(self.parent_pid, 0)
       
   144         except OSError:
       
   145             # *beeep* oh no! The phone's disconnected!
       
   146             return False
       
   147         else:
       
   148             # *ring* Hi mom!
       
   149             return True
       
   150     
       
   151 
       
   152     def __get_process_event_args(self):
       
   153         return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
       
   154 
       
   155     def run(self):
       
   156         try:
       
   157             add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
       
   158             self.do_run()
       
   159         finally:
       
   160             add_process_event("stop_worker",self.__get_process_event_args(), self.session_maker)
       
   161         
       
   162     def do_run(self):
       
   163         raise NotImplementedError()
       
   164 
       
   165 
       
   166 
       
   167 class SourceProcess(BaseProcess):
       
   168     
       
   169     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
   121         self.track = options.track
   170         self.track = options.track
   122         self.reconnects = options.reconnects
   171         self.reconnects = options.reconnects
   123         self.token_filename = options.token_filename
   172         self.token_filename = options.token_filename
   124         self.stop_event = stop_event
   173         super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
   125         self.options = options
   174 
   126         self.access_token = access_token
   175     def do_run(self):
   127         self.logger_queue = logger_queue
       
   128         super(SourceProcess, self).__init__()
       
   129     
       
   130     def run(self):
       
   131         
   176         
   132         #import pydevd
   177         #import pydevd
   133         #pydevd.settrace(suspend=False)
   178         #pydevd.settrace(suspend=False)
   134 
   179 
   135         self.logger = set_logging_process(self.options, self.logger_queue)
   180         self.logger = set_logging_process(self.options, self.logger_queue)
   146         
   191         
   147         session = self.session_maker()
   192         session = self.session_maker()
   148         
   193         
   149         try:
   194         try:
   150             for tweet in stream:
   195             for tweet in stream:
   151                 self.logger.debug("tweet " + repr(tweet))
   196                 if not self.parent_is_alive():
       
   197                     sys.exit()
       
   198                 self.logger.debug("SourceProcess : tweet " + repr(tweet))
   152                 source = TweetSource(original_json=tweet)
   199                 source = TweetSource(original_json=tweet)
   153                 self.logger.debug("source created")
   200                 self.logger.debug("SourceProcess : source created")
   154                 add_retries = 0
   201                 add_retries = 0
   155                 while add_retries < 10:
   202                 while add_retries < 10:
   156                     try:
   203                     try:
   157                         add_retries += 1
   204                         add_retries += 1
   158                         session.add(source)
   205                         session.add(source)
   159                         session.flush()
   206                         session.flush()
   160                         break
   207                         break
   161                     except OperationalError as e:
   208                     except OperationalError as e:
   162                         session.rollback()
   209                         session.rollback()
   163                         self.logger.debug("Operational Error %s nb %d" % (repr(e), add_retries))
   210                         self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
   164                         if add_retries == 10:
   211                         if add_retries == 10:
   165                             raise e
   212                             raise e
   166                      
   213                      
   167                 source_id = source.id
   214                 source_id = source.id
   168                 self.logger.debug("before queue + source id " + repr(source_id))
   215                 self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
   169                 self.logger.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
   216                 self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
   170                 session.commit()
   217                 session.commit()
   171                 self.queue.put((source_id, tweet), False)
   218                 self.queue.put((source_id, tweet), False)
   172 
   219 
   173         except Exception as e:
   220         except Exception as e:
   174             self.logger.error("Error when processing tweet " + repr(e))
   221             self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
   175         finally:
   222         finally:
   176             session.rollback()
   223             session.rollback()
   177             stream.close()
   224             stream.close()
   178             session.close()
   225             session.close()
   179             self.queue.close()
   226             self.queue.close()
   194         logger.debug(u"Process_tweet :" + repr(tweet))
   241         logger.debug(u"Process_tweet :" + repr(tweet))
   195         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None)
   242         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None)
   196         processor.process()
   243         processor.process()
   197     except Exception as e:
   244     except Exception as e:
   198         message = u"Error %s processing tweet %s" % (repr(e), tweet)
   245         message = u"Error %s processing tweet %s" % (repr(e), tweet)
   199         logger.error(message)
   246         logger.exception(message)
   200         output = StringIO.StringIO()
   247         output = StringIO.StringIO()
   201         traceback.print_exception(Exception, e, None, None, output)
   248         try:
   202         error_stack = output.getvalue()
   249             traceback.print_exc(file=output)
   203         output.close()
   250             error_stack = output.getvalue()
       
   251         finally:
       
   252             output.close()
   204         session.rollback()
   253         session.rollback()
   205         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
   254         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
   206         session.add(tweet_log)
   255         session.add(tweet_log)
   207         session.commit()
   256         session.commit()
   208 
   257 
   209     
   258     
   210         
   259         
   211 class TweetProcess(Process):
   260 class TweetProcess(BaseProcess):
   212     
   261     
   213     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
   262     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
   214         self.session_maker = session_maker
   263         super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
   215         self.queue = queue
   264 
   216         self.stop_event = stop_event
   265 
   217         self.options = options
   266     def do_run(self):
   218         self.access_token = access_token
       
   219         self.logger_queue = logger_queue
       
   220         super(TweetProcess, self).__init__()
       
   221 
       
   222 
       
   223     def run(self):
       
   224         
   267         
   225         self.logger = set_logging_process(self.options, self.logger_queue)
   268         self.logger = set_logging_process(self.options, self.logger_queue)
   226         session = self.session_maker()
   269         session = self.session_maker()
   227         try:
   270         try:
   228             while not self.stop_event.is_set():
   271             while not self.stop_event.is_set() and self.parent_is_alive():
   229                 try:
   272                 try:
   230                     source_id, tweet_txt = queue.get(True, 3)
   273                     source_id, tweet_txt = self.queue.get(True, 3)
   231                     self.logger.debug("Processing source id " + repr(source_id))
   274                     self.logger.debug("Processing source id " + repr(source_id))
   232                 except Exception as e:
   275                 except Exception as e:
   233                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
   276                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
   234                     continue
   277                     continue
   235                 process_tweet(tweet_txt, source_id, session, self.access_token, self.logger)
   278                 process_tweet(tweet_txt, source_id, session, self.access_token, self.logger)
   271                 continue
   314                 continue
   272         time.sleep(0.1)
   315         time.sleep(0.1)
   273 
   316 
   274         
   317         
   275 def get_options():
   318 def get_options():
   276     parser = OptionParser()
   319 
       
   320     usage = "usage: %prog [options]"
       
   321 
       
   322     parser = OptionParser(usage=usage)
       
   323 
   277     parser.add_option("-f", "--file", dest="conn_str",
   324     parser.add_option("-f", "--file", dest="conn_str",
   278                       help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
   325                       help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
   279     parser.add_option("-u", "--user", dest="username",
   326     parser.add_option("-u", "--user", dest="username",
   280                       help="Twitter user", metavar="USER", default=None)
   327                       help="Twitter user", metavar="USER", default=None)
   281     parser.add_option("-w", "--password", dest="password",
   328     parser.add_option("-w", "--password", dest="password",
   291     parser.add_option("-d", "--duration", dest="duration",
   338     parser.add_option("-d", "--duration", dest="duration",
   292                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
   339                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
   293     parser.add_option("-N", "--nb-process", dest="process_nb",
   340     parser.add_option("-N", "--nb-process", dest="process_nb",
   294                       help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
   341                       help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
   295 
   342 
   296 
       
   297 
       
   298     utils.set_logging_options(parser)
   343     utils.set_logging_options(parser)
   299 
   344 
   300     return parser.parse_args()
   345     return parser.parse_args()
   301     
   346 
       
   347 
       
   348 def do_run(options, session_maker):
       
   349 
       
   350     stop_args = {}
       
   351 
       
   352     access_token = None
       
   353     if not options.username or not options.password:
       
   354         access_token = utils.get_oauth_token(options.token_filename)
       
   355     
       
   356     session = session_maker()
       
   357     try:
       
   358         process_leftovers(session, access_token, utils.get_logger())
       
   359         session.commit()
       
   360     finally:
       
   361         session.rollback()
       
   362         session.close()
       
   363     
       
   364     if options.process_nb <= 0:
       
   365         utils.get_logger().debug("Leftovers processed. Exiting.")
       
   366         return None
       
   367 
       
   368     queue = mQueue()
       
   369     stop_event = Event()
       
   370     
       
   371     #workaround for bug on using urllib2 and multiprocessing
       
   372     req = urllib2.Request('http://localhost')
       
   373     conn = None
       
   374     try:
       
   375         conn = urllib2.urlopen(req)
       
   376     except:
       
   377         utils.get_logger().debug("could not open localhost")
       
   378         #donothing
       
   379     finally:
       
   380         if conn is not None:
       
   381             conn.close()
       
   382     
       
   383     process_engines = []
       
   384     logger_queues = []
       
   385     
       
   386     SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
       
   387     process_engines.append(engine_process)
       
   388     lqueue = mQueue(1)
       
   389     logger_queues.append(lqueue)
       
   390     pid = os.getpid()
       
   391     sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
       
   392     
       
   393     tweet_processes = []
       
   394     
       
   395     for i in range(options.process_nb - 1):
       
   396         SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
       
   397         process_engines.append(engine_process)
       
   398         lqueue = mQueue(1)
       
   399         logger_queues.append(lqueue)
       
   400         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
       
   401         tweet_processes.append(cprocess)
       
   402 
       
   403     def interupt_handler(signum, frame):
       
   404         utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
       
   405         stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
       
   406         stop_event.set()
       
   407         
       
   408     signal.signal(signal.SIGINT , interupt_handler)
       
   409     signal.signal(signal.SIGHUP , interupt_handler)
       
   410     signal.signal(signal.SIGALRM, interupt_handler)
       
   411     signal.signal(signal.SIGTERM, interupt_handler)
       
   412 
       
   413     log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
       
   414     log_thread.daemon = True
       
   415 
       
   416     log_thread.start()
       
   417 
       
   418     sprocess.start()
       
   419     for cprocess in tweet_processes:
       
   420         cprocess.start()
       
   421 
       
   422     add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name,sprocess.pid), 'consumers':dict([(p.name,p.pid) for p in tweet_processes])}, session_maker)
       
   423 
       
   424     if options.duration >= 0:
       
   425         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
       
   426     
       
   427 
       
   428     while not stop_event.is_set():
       
   429         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
       
   430             stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
       
   431             stop_event.set()
       
   432             break
       
   433         if sprocess.is_alive():            
       
   434             time.sleep(1)
       
   435         else:
       
   436             stop_args.update({'message': 'Source process killed'})
       
   437             stop_event.set()
       
   438             break
       
   439     utils.get_logger().debug("Joining Source Process")
       
   440     try:
       
   441         sprocess.join(10)
       
   442     except:
       
   443         utils.get_logger().debug("Pb joining Source Process - terminating")
       
   444         sprocess.terminate()
       
   445         
       
   446     for i, cprocess in enumerate(tweet_processes):
       
   447         utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
       
   448         try:
       
   449             cprocess.join(3)
       
   450         except:
       
   451             utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
       
   452             cprocess.terminate()
       
   453 
       
   454     
       
   455     utils.get_logger().debug("Close queues")
       
   456     try:
       
   457         queue.close()
       
   458         for lqueue in logger_queues:
       
   459             lqueue.close()
       
   460     except exception as e:
       
   461         utils.get_logger().error("error when closing queues %s", repr(e))
       
   462         #do nothing
       
   463         
       
   464     
       
   465     if options.process_nb > 1:
       
   466         utils.get_logger().debug("Processing leftovers")
       
   467         session = session_maker()
       
   468         try:
       
   469             process_leftovers(session, access_token, utils.get_logger())
       
   470             session.commit()
       
   471         finally:
       
   472             session.rollback()
       
   473             session.close()
       
   474 
       
   475     for pengine in process_engines:
       
   476         pengine.dispose()
       
   477 
       
   478     return stop_args
       
   479 
   302 
   480 
   303 if __name__ == '__main__':
   481 if __name__ == '__main__':
   304 
   482 
   305     (options, args) = get_options()
   483     (options, args) = get_options()
   306     
   484     
   307     set_logging(options)
   485     set_logging(options)
   308         
   486     
   309     if options.debug:
   487     utils.get_logger().debug("OPTIONS : " + repr(options))    
   310         print "OPTIONS : "
       
   311         print repr(options)
       
   312     
       
   313     
   488     
   314     conn_str = options.conn_str.strip()
   489     conn_str = options.conn_str.strip()
   315     if not re.match("^\w+://.+", conn_str):
   490     if not re.match("^\w+://.+", conn_str):
   316         conn_str = 'sqlite:///' + options.conn_str
   491         conn_str = 'sqlite:///' + options.conn_str
   317         
   492         
   318     if conn_str.startswith("sqlite") and options.new:
   493     if conn_str.startswith("sqlite") and options.new:
   319         filepath = conn_str[conn_str.find(":///")+4:]
   494         filepath = conn_str[conn_str.find(":///") + 4:]
   320         if os.path.exists(filepath):
   495         if os.path.exists(filepath):
   321             i = 1
   496             i = 1
   322             basename, extension = os.path.splitext(filepath)
   497             basename, extension = os.path.splitext(filepath)
   323             new_path = '%s.%d%s' % (basename, i, extension)
   498             new_path = '%s.%d%s' % (basename, i, extension)
   324             while i < 1000000 and os.path.exists(new_path):
   499             while i < 1000000 and os.path.exists(new_path):
   337             message = "Database %s not empty exiting" % conn_str
   512             message = "Database %s not empty exiting" % conn_str
   338             utils.get_logger().error(message)
   513             utils.get_logger().error(message)
   339             sys.exit(message)
   514             sys.exit(message)
   340     
   515     
   341     metadata.create_all(engine)
   516     metadata.create_all(engine)
   342     
   517     stop_args = {}
   343     access_token = None
   518     try:
   344     if not options.username or not options.password:
   519         add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
   345         access_token = utils.get_oauth_token(options.token_filename)
   520         stop_args = do_run(options, Session)
   346     
   521     except Exception as e:
   347     session = Session()
   522         utils.get_logger().exception("Error in main thread")        
   348     try:
   523         outfile = StringIO.StringIO()
   349         process_leftovers(session, access_token, utils.get_logger())
   524         try:
   350         session.commit()
   525             traceback.print_exc(file=outfile)
   351     finally:
   526             stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
   352         session.rollback()
       
   353         session.close()
       
   354     
       
   355     if options.process_nb <= 0:
       
   356         utils.get_logger().debug("Leftovers processed. Exiting.")
       
   357         sys.exit()
       
   358 
       
   359     queue = mQueue()
       
   360     stop_event = Event()
       
   361     
       
   362     #workaround for bug on using urllib2 and multiprocessing
       
   363     req = urllib2.Request('http://localhost')
       
   364     conn = None
       
   365     try:
       
   366         conn = urllib2.urlopen(req)
       
   367     except:
       
   368         pass
       
   369         #donothing
       
   370     finally:
       
   371         if conn is not None:
       
   372             conn.close()
       
   373     
       
   374     process_engines = []
       
   375     logger_queues = []
       
   376     
       
   377     SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
       
   378     process_engines.append(engine_process)
       
   379     lqueue = mQueue(1)
       
   380     logger_queues.append(lqueue)
       
   381     sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)    
       
   382     
       
   383     tweet_processes = []
       
   384     
       
   385     for i in range(options.process_nb - 1):
       
   386         SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
       
   387         process_engines.append(engine_process)
       
   388         lqueue = mQueue(1)
       
   389         logger_queues.append(lqueue)
       
   390         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)
       
   391         tweet_processes.append(cprocess)
       
   392 
       
   393     def interupt_handler(signum, frame):
       
   394         stop_event.set()
       
   395         
       
   396     signal.signal(signal.SIGINT, interupt_handler)
       
   397 
       
   398     log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues,stop_event,))
       
   399     log_thread.daemon = True
       
   400 
       
   401     sprocess.start()
       
   402     for cprocess in tweet_processes:
       
   403         cprocess.start()
       
   404 
       
   405     log_thread.start()
       
   406 
       
   407     if options.duration >= 0:
       
   408         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
       
   409     
       
   410 
       
   411     while not stop_event.is_set():
       
   412         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
       
   413             stop_event.set()
       
   414             break
       
   415         if sprocess.is_alive():            
       
   416             time.sleep(1)
       
   417         else:
       
   418             stop_event.set()
       
   419             break
       
   420     utils.get_logger().debug("Joining Source Process")
       
   421     try:
       
   422         sprocess.join(10)
       
   423     except:
       
   424         utils.get_logger().debug("Pb joining Source Process - terminating")
       
   425         sprocess.terminate()
       
   426         
       
   427     for i, cprocess in enumerate(tweet_processes):
       
   428         utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
       
   429         try:
       
   430             cprocess.join(3)
       
   431         except:
       
   432             utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
       
   433             cprocess.terminate()
       
   434 
       
   435     
       
   436     utils.get_logger().debug("Close queues")
       
   437     try:
       
   438         queue.close()
       
   439         for lqueue in logger_queues:
       
   440             lqueue.close()
       
   441     except exception as e:
       
   442         utils.get_logger().error("error when closing queues %s", repr(e))
       
   443         #do nothing
       
   444         
       
   445     
       
   446     if options.process_nb > 1:
       
   447         utils.get_logger().debug("Processing leftovers")
       
   448         session = Session()
       
   449         try:
       
   450             process_leftovers(session, access_token, utils.get_logger())
       
   451             session.commit()
       
   452         finally:
   527         finally:
   453             session.rollback()
   528             outfile.close()
   454             session.close()
   529         raise
   455 
   530     finally:    
   456     for pengine in process_engines:
   531         add_process_event(type="shutdown", args=stop_args, session_maker=Session)
   457         pengine.dispose()
   532 
   458         
       
   459     utils.get_logger().debug("Done. Exiting.")
   533     utils.get_logger().debug("Done. Exiting.")
   460         
   534