script/stream/recorder_tweetstream.py
changeset 883 8ae3d91ea4ae
parent 882 b46cfa1d188b
child 884 07f1c6854df9
equal deleted inserted replaced
882:b46cfa1d188b 883:8ae3d91ea4ae
     1 from getpass import getpass
       
     2 from iri_tweet import models, utils
       
     3 from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
       
     4 from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
       
     5     get_logger)
       
     6 from optparse import OptionParser
       
     7 from sqlalchemy.exc import OperationalError
       
     8 from sqlalchemy.orm import scoped_session
       
     9 import Queue
       
    10 import StringIO
       
    11 import anyjson
       
    12 import datetime
       
    13 import inspect
       
    14 import logging
       
    15 import os
       
    16 import re
       
    17 import shutil
       
    18 import signal
       
    19 import socket
       
    20 import sqlalchemy.schema
       
    21 import sys
       
    22 import threading
       
    23 import time
       
    24 import traceback
       
    25 import tweepy.auth
       
    26 import iri_tweet.stream as tweetstream
       
    27 import urllib2
       
    28 socket._fileobject.default_bufsize = 0
       
    29 
       
    30 
       
    31 
       
    32 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
       
    33 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
       
    34 #columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
       
    35 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
       
    36 #just put it in a sqlite3 tqble
       
    37 
       
    38 DEFAULT_TIMEOUT = 5
       
    39 
       
    40 def set_logging(options):
       
    41     loggers = []
       
    42     
       
    43     loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
       
    44     loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing')))
       
    45     if options.debug >= 2:
       
    46         loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine')))
       
    47     #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
       
    48     #utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
       
    49     #utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
       
    50     return loggers
       
    51 
       
    52 def set_logging_process(options, queue):
       
    53     qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
       
    54     qlogger.propagate = 0
       
    55     return qlogger
       
    56 
       
    57 def get_auth(options, access_token):
       
    58     if options.username and options.password:
       
    59         auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
       
    60     else:
       
    61         consumer_key = models.CONSUMER_KEY
       
    62         consumer_secret = models.CONSUMER_SECRET
       
    63         auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=True)
       
    64         auth.set_access_token(*access_token)
       
    65     return auth
       
    66 
       
    67 
       
    68 def add_process_event(type, args, session_maker):
       
    69     session = session_maker()
       
    70     try:
       
    71         evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
       
    72         session.add(evt)
       
    73         session.commit()
       
    74     finally:
       
    75         session.close()
       
    76 
       
    77 
       
    78 class BaseProcess(Process):
       
    79 
       
    80     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
       
    81         self.parent_pid = parent_pid
       
    82         self.session_maker = session_maker
       
    83         self.queue = queue
       
    84         self.options = options
       
    85         self.logger_queue = logger_queue
       
    86         self.stop_event = stop_event
       
    87         self.access_token = access_token
       
    88 
       
    89         super(BaseProcess, self).__init__()
       
    90 
       
    91     #
       
    92     # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
       
    93     #
       
    94     def parent_is_alive(self):
       
    95         try:
       
    96             # try to call Parent
       
    97             os.kill(self.parent_pid, 0)
       
    98         except OSError:
       
    99             # *beeep* oh no! The phone's disconnected!
       
   100             return False
       
   101         else:
       
   102             # *ring* Hi mom!
       
   103             return True
       
   104     
       
   105 
       
   106     def __get_process_event_args(self):
       
   107         return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
       
   108 
       
   109     def run(self):
       
   110         try:
       
   111             add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
       
   112             self.do_run()
       
   113         finally:
       
   114             add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
       
   115         
       
   116     def do_run(self):
       
   117         raise NotImplementedError()
       
   118 
       
   119 
       
   120 
       
   121 class SourceProcess(BaseProcess):
       
   122     
       
   123     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
       
   124         self.track = options.track
       
   125         self.token_filename = options.token_filename
       
   126         self.catchup = options.catchup
       
   127         self.timeout = options.timeout
       
   128         self.stream = None
       
   129         super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
       
   130                     
       
   131     def __source_stream_iter(self):
       
   132         
       
   133         self.logger = set_logging_process(self.options, self.logger_queue)
       
   134         self.logger.debug("SourceProcess : run ")
       
   135         
       
   136         self.auth = get_auth(self.options, self.access_token) 
       
   137         self.logger.debug("SourceProcess : auth set ")
       
   138         
       
   139         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
       
   140         self.logger.debug("SourceProcess : track list " + track_list)
       
   141         
       
   142         track_list = [k.strip() for k in track_list.split(',')]
       
   143 
       
   144         self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
       
   145         self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger)
       
   146         self.logger.debug("SourceProcess : after connecting to stream")
       
   147         self.stream.muststop = lambda: self.stop_event.is_set()        
       
   148         
       
   149         stream_wrapper = tweetstream.SafeStreamWrapper(self.stream, logger=self.logger)
       
   150         
       
   151         session = self.session_maker()
       
   152         
       
   153         try:
       
   154             for tweet in stream_wrapper:
       
   155                 if not self.parent_is_alive():
       
   156                     self.stop_event.set()
       
   157                     stop_thread.join(5)
       
   158                     sys.exit()
       
   159                 self.logger.debug("SourceProcess : tweet " + repr(tweet))
       
   160                 source = TweetSource(original_json=tweet)
       
   161                 self.logger.debug("SourceProcess : source created")
       
   162                 add_retries = 0
       
   163                 while add_retries < 10:
       
   164                     try:
       
   165                         add_retries += 1
       
   166                         session.add(source)
       
   167                         session.flush()
       
   168                         break
       
   169                     except OperationalError as e:
       
   170                         session.rollback()
       
   171                         self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
       
   172                         if add_retries == 10:
       
   173                             raise
       
   174                      
       
   175                 source_id = source.id
       
   176                 self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
       
   177                 self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
       
   178                 session.commit()
       
   179                 self.queue.put((source_id, tweet), False)
       
   180 
       
   181         except Exception as e:
       
   182             self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
       
   183             raise
       
   184         finally:
       
   185             session.rollback()
       
   186             session.close()
       
   187             self.logger_queue.close()
       
   188             self.queue.close()
       
   189             self.stream.close()
       
   190             self.stream = None
       
   191             if not self.stop_event.is_set():
       
   192                 self.stop_event.set()
       
   193 
       
   194 
       
   195     def do_run(self):
       
   196         
       
   197         #import pydevd
       
   198         #pydevd.settrace(suspend=False)
       
   199         
       
   200         source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
       
   201         
       
   202         source_stream_iter_thread.start()
       
   203         
       
   204         while not self.stop_event.is_set():
       
   205             self.logger.debug("SourceProcess : In while after start")
       
   206             self.stop_event.wait(DEFAULT_TIMEOUT)
       
   207             if self.stop_event.is_set() and self.stream:
       
   208                 self.stream.close()
       
   209             elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
       
   210                 self.stop_event.set()
       
   211 
       
   212         self.logger.info("SourceProcess : join")
       
   213         source_stream_iter_thread.join(30)
       
   214 
       
   215 
       
   216 def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger):
       
   217     try:
       
   218         if not tweet.strip():
       
   219             return
       
   220         tweet_obj = anyjson.deserialize(tweet)
       
   221         if 'text' not in tweet_obj:
       
   222             tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
       
   223             session.add(tweet_log)
       
   224             return
       
   225         screen_name = ""
       
   226         if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
       
   227             screen_name = tweet_obj['user']['screen_name']
       
   228         logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
       
   229         logger.debug(u"Process_tweet :" + repr(tweet))
       
   230         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user)
       
   231         processor.process()
       
   232     except ValueError as e:
       
   233         message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
       
   234         output = StringIO.StringIO()
       
   235         try:
       
   236             traceback.print_exc(file=output)
       
   237             error_stack = output.getvalue()
       
   238         finally:
       
   239             output.close()
       
   240         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
       
   241         session.add(tweet_log)
       
   242         session.commit()        
       
   243     except Exception as e:
       
   244         message = u"Error %s processing tweet %s" % (repr(e), tweet)
       
   245         logger.exception(message)
       
   246         output = StringIO.StringIO()
       
   247         try:
       
   248             traceback.print_exc(file=output)
       
   249             error_stack = output.getvalue()
       
   250         finally:
       
   251             output.close()
       
   252         session.rollback()
       
   253         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
       
   254         session.add(tweet_log)
       
   255         session.commit()
       
   256 
       
   257     
       
   258         
       
   259 class TweetProcess(BaseProcess):
       
   260     
       
   261     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
       
   262         super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
       
   263         self.twitter_query_user = options.twitter_query_user
       
   264 
       
   265 
       
   266     def do_run(self):
       
   267         
       
   268         self.logger = set_logging_process(self.options, self.logger_queue)
       
   269         session = self.session_maker()
       
   270         try:
       
   271             while not self.stop_event.is_set() and self.parent_is_alive():
       
   272                 try:
       
   273                     source_id, tweet_txt = self.queue.get(True, 3)
       
   274                     self.logger.debug("Processing source id " + repr(source_id))
       
   275                 except Exception as e:
       
   276                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
       
   277                     continue
       
   278                 process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger)
       
   279                 session.commit()
       
   280         finally:
       
   281             session.rollback()
       
   282             session.close()
       
   283 
       
   284 
       
   285 def get_sessionmaker(conn_str):
       
   286     engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
       
   287     Session = scoped_session(Session)
       
   288     return Session, engine, metadata
       
   289 
       
   290             
       
   291 def process_leftovers(session, access_token, twitter_query_user, logger):
       
   292     
       
   293     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
       
   294     
       
   295     for src in sources:
       
   296         tweet_txt = src.original_json
       
   297         process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger)
       
   298         session.commit()
       
   299 
       
   300         
       
   301     
       
   302     #get tweet source that do not match any message
       
   303     #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
       
   304 def process_log(logger_queues, stop_event):
       
   305     while not stop_event.is_set():
       
   306         for lqueue in logger_queues:
       
   307             try:
       
   308                 record = lqueue.get_nowait()
       
   309                 logging.getLogger(record.name).handle(record)
       
   310             except Queue.Empty:
       
   311                 continue
       
   312             except IOError:
       
   313                 continue
       
   314         time.sleep(0.1)
       
   315 
       
   316         
       
   317 def get_options():
       
   318 
       
   319     usage = "usage: %prog [options]"
       
   320 
       
   321     parser = OptionParser(usage=usage)
       
   322 
       
   323     parser.add_option("-f", "--file", dest="conn_str",
       
   324                       help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
       
   325     parser.add_option("-u", "--user", dest="username",
       
   326                       help="Twitter user", metavar="USER", default=None)
       
   327     parser.add_option("-w", "--password", dest="password",
       
   328                       help="Twitter password", metavar="PASSWORD", default=None)
       
   329     parser.add_option("-T", "--track", dest="track",
       
   330                       help="Twitter track", metavar="TRACK")
       
   331     parser.add_option("-n", "--new", dest="new", action="store_true",
       
   332                       help="new database", default=False)
       
   333     parser.add_option("-D", "--daemon", dest="daemon", action="store_true",
       
   334                       help="launch daemon", default=False)
       
   335     parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
       
   336                       help="Token file name")
       
   337     parser.add_option("-d", "--duration", dest="duration",
       
   338                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
       
   339     parser.add_option("-N", "--nb-process", dest="process_nb",
       
   340                       help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
       
   341     parser.add_option("--url", dest="url",
       
   342                       help="The twitter url to connect to.", metavar="URL", default=tweetstream.FilterStream.url)
       
   343     parser.add_option("--query-user", dest="twitter_query_user", action="store_true",
       
   344                       help="Query twitter for users", default=False, metavar="QUERY_USER")
       
   345     parser.add_option("--catchup", dest="catchup",
       
   346                       help="catchup count for tweets", default=None, metavar="CATCHUP", type='int')
       
   347     parser.add_option("--timeout", dest="timeout",
       
   348                       help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int')
       
   349     
       
   350 
       
   351 
       
   352 
       
   353     utils.set_logging_options(parser)
       
   354 
       
   355     return parser.parse_args()
       
   356 
       
   357 
       
   358 def do_run(options, session_maker):
       
   359 
       
   360     stop_args = {}
       
   361 
       
   362     access_token = None
       
   363     if not options.username or not options.password:
       
   364         access_token = utils.get_oauth_token(options.token_filename)
       
   365     
       
   366     session = session_maker()
       
   367     try:
       
   368         process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
       
   369         session.commit()
       
   370     finally:
       
   371         session.rollback()
       
   372         session.close()
       
   373     
       
   374     if options.process_nb <= 0:
       
   375         utils.get_logger().debug("Leftovers processed. Exiting.")
       
   376         return None
       
   377 
       
   378     queue = mQueue()
       
   379     stop_event = Event()
       
   380     
       
   381     #workaround for bug on using urllib2 and multiprocessing
       
   382     req = urllib2.Request('http://localhost')
       
   383     conn = None
       
   384     try:
       
   385         conn = urllib2.urlopen(req)
       
   386     except:
       
   387         utils.get_logger().debug("could not open localhost")
       
   388         #donothing
       
   389     finally:
       
   390         if conn is not None:
       
   391             conn.close()
       
   392     
       
   393     process_engines = []
       
   394     logger_queues = []
       
   395     
       
   396     SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
       
   397     process_engines.append(engine_process)
       
   398     lqueue = mQueue(50)
       
   399     logger_queues.append(lqueue)
       
   400     pid = os.getpid()
       
   401     sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
       
   402     
       
   403     tweet_processes = []
       
   404     
       
   405     for i in range(options.process_nb - 1):
       
   406         SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
       
   407         process_engines.append(engine_process)
       
   408         lqueue = mQueue(50)
       
   409         logger_queues.append(lqueue)
       
   410         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
       
   411         tweet_processes.append(cprocess)
       
   412 
       
   413     log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
       
   414     log_thread.daemon = True
       
   415 
       
   416     log_thread.start()
       
   417 
       
   418     sprocess.start()
       
   419     for cprocess in tweet_processes:
       
   420         cprocess.start()
       
   421 
       
   422     add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
       
   423 
       
   424     if options.duration >= 0:
       
   425         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
       
   426 
       
   427     def interupt_handler(signum, frame):
       
   428         utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
       
   429         stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
       
   430         stop_event.set()
       
   431         
       
   432     signal.signal(signal.SIGINT , interupt_handler)
       
   433     signal.signal(signal.SIGHUP , interupt_handler)
       
   434     signal.signal(signal.SIGALRM, interupt_handler)
       
   435     signal.signal(signal.SIGTERM, interupt_handler)
       
   436     
       
   437 
       
   438     while not stop_event.is_set():
       
   439         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
       
   440             stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
       
   441             stop_event.set()
       
   442             break
       
   443         if sprocess.is_alive():
       
   444             utils.get_logger().debug("Source process alive")
       
   445             time.sleep(1)
       
   446         else:
       
   447             stop_args.update({'message': 'Source process killed'})
       
   448             stop_event.set()
       
   449             break
       
   450     utils.get_logger().debug("Joining Source Process")
       
   451     try:
       
   452         sprocess.join(10)
       
   453     except:
       
   454         utils.get_logger().debug("Pb joining Source Process - terminating")
       
   455         sprocess.terminate()
       
   456         
       
   457     for i, cprocess in enumerate(tweet_processes):
       
   458         utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
       
   459         try:
       
   460             cprocess.join(3)
       
   461         except:
       
   462             utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
       
   463             cprocess.terminate()
       
   464 
       
   465     
       
   466     utils.get_logger().debug("Close queues")
       
   467     try:
       
   468         queue.close()
       
   469         for lqueue in logger_queues:
       
   470             lqueue.close()
       
   471     except exception as e:
       
   472         utils.get_logger().error("error when closing queues %s", repr(e))
       
   473         #do nothing
       
   474         
       
   475     
       
   476     if options.process_nb > 1:
       
   477         utils.get_logger().debug("Processing leftovers")
       
   478         session = session_maker()
       
   479         try:
       
   480             process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
       
   481             session.commit()
       
   482         finally:
       
   483             session.rollback()
       
   484             session.close()
       
   485 
       
   486     for pengine in process_engines:
       
   487         pengine.dispose()
       
   488 
       
   489     return stop_args
       
   490 
       
   491 
       
   492 def main(options, args):
       
   493     
       
   494     global conn_str
       
   495     
       
   496     conn_str = options.conn_str.strip()
       
   497     if not re.match("^\w+://.+", conn_str):
       
   498         conn_str = 'sqlite:///' + options.conn_str
       
   499         
       
   500     if conn_str.startswith("sqlite") and options.new:
       
   501         filepath = conn_str[conn_str.find(":///") + 4:]
       
   502         if os.path.exists(filepath):
       
   503             i = 1
       
   504             basename, extension = os.path.splitext(filepath)
       
   505             new_path = '%s.%d%s' % (basename, i, extension)
       
   506             while i < 1000000 and os.path.exists(new_path):
       
   507                 i += 1
       
   508                 new_path = '%s.%d%s' % (basename, i, extension)
       
   509             if i >= 1000000:
       
   510                 raise Exception("Unable to find new filename for " + filepath)
       
   511             else:
       
   512                 shutil.move(filepath, new_path)
       
   513 
       
   514     Session, engine, metadata = get_sessionmaker(conn_str)
       
   515     
       
   516     if options.new:
       
   517         check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
       
   518         if len(check_metadata.sorted_tables) > 0:
       
   519             message = "Database %s not empty exiting" % conn_str
       
   520             utils.get_logger().error(message)
       
   521             sys.exit(message)
       
   522     
       
   523     metadata.create_all(engine)
       
   524     session = Session()
       
   525     try:
       
   526         models.add_model_version(session)
       
   527     finally:
       
   528         session.close()
       
   529     
       
   530     stop_args = {}
       
   531     try:
       
   532         add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
       
   533         stop_args = do_run(options, Session)
       
   534     except Exception as e:
       
   535         utils.get_logger().exception("Error in main thread")        
       
   536         outfile = StringIO.StringIO()
       
   537         try:
       
   538             traceback.print_exc(file=outfile)
       
   539             stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
       
   540         finally:
       
   541             outfile.close()
       
   542         raise
       
   543     finally:    
       
   544         add_process_event(type="shutdown", args=stop_args, session_maker=Session)
       
   545 
       
   546     utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
       
   547 
       
   548 
       
   549 
       
   550 if __name__ == '__main__':
       
   551 
       
   552     (options, args) = get_options()
       
   553     
       
   554     loggers = set_logging(options)
       
   555     
       
   556     utils.get_logger().debug("OPTIONS : " + repr(options))
       
   557     
       
   558     if options.daemon:
       
   559         import daemon
       
   560         import lockfile
       
   561         
       
   562         hdlr_preserve = []
       
   563         for logger in loggers:
       
   564             hdlr_preserve.extend([h.stream for h in logger.handlers])
       
   565             
       
   566         context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) 
       
   567         with context:
       
   568             main(options, args)
       
   569     else:
       
   570         main(options, args)
       
   571