script/stream/recorder_stream.py
changeset 883 8ae3d91ea4ae
parent 739 350ffcb7ae4d
child 884 07f1c6854df9
equal deleted inserted replaced
882:b46cfa1d188b 883:8ae3d91ea4ae
       
     1 from getpass import getpass
       
     2 from iri_tweet import models, utils
       
     3 from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
       
     4 from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
       
     5     get_logger)
       
     6 from optparse import OptionParser
       
     7 from sqlalchemy.exc import OperationalError
       
     8 from sqlalchemy.orm import scoped_session
       
     9 import Queue
       
    10 import StringIO
       
    11 import anyjson
       
    12 import datetime
       
    13 import inspect
       
    14 import iri_tweet.stream
       
    15 import logging
       
    16 import os
       
    17 import re
       
    18 import requests.auth
       
    19 import shutil
       
    20 import signal
       
    21 import socket
       
    22 import sqlalchemy.schema
       
    23 import sys
       
    24 import threading
       
    25 import time
       
    26 import traceback
       
    27 import urllib2
       
    28 socket._fileobject.default_bufsize = 0
       
    29 
       
    30 
       
    31 
       
    32 # columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
       
    33 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
       
    34 # columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
       
    35 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
       
    36 # just put it in a sqlite3 tqble
       
    37 
       
    38 DEFAULT_TIMEOUT = 5
       
    39 
       
    40 def set_logging(options):
       
    41     loggers = []
       
    42     
       
    43     loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
       
    44     loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing')))
       
    45     if options.debug >= 2:
       
    46         loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine')))
       
    47     # utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
       
    48     # utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
       
    49     # utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
       
    50     return loggers
       
    51 
       
    52 def set_logging_process(options, queue):
       
    53     qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
       
    54     qlogger.propagate = 0
       
    55     return qlogger
       
    56 
       
    57 def get_auth(options, access_token):
       
    58     if options.username and options.password:
       
    59         auth = requests.auth.BasicAuthHandler(options.username, options.password)        
       
    60     else:
       
    61         consumer_key = models.CONSUMER_KEY
       
    62         consumer_secret = models.CONSUMER_SECRET
       
    63         auth = requests.auth.OAuth1(access_token[0], access_token[1], consumer_key, consumer_secret, signature_type='auth_header')
       
    64     return auth
       
    65 
       
    66 
       
    67 def add_process_event(type, args, session_maker):
       
    68     session = session_maker()
       
    69     try:
       
    70         evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
       
    71         session.add(evt)
       
    72         session.commit()
       
    73     finally:
       
    74         session.close()
       
    75 
       
    76 
       
    77 class BaseProcess(Process):
       
    78 
       
    79     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
       
    80         self.parent_pid = parent_pid
       
    81         self.session_maker = session_maker
       
    82         self.queue = queue
       
    83         self.options = options
       
    84         self.logger_queue = logger_queue
       
    85         self.stop_event = stop_event
       
    86         self.access_token = access_token
       
    87 
       
    88         super(BaseProcess, self).__init__()
       
    89 
       
    90     #
       
    91     # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
       
    92     #
       
    93     def parent_is_alive(self):
       
    94         try:
       
    95             # try to call Parent
       
    96             os.kill(self.parent_pid, 0)
       
    97         except OSError:
       
    98             # *beeep* oh no! The phone's disconnected!
       
    99             return False
       
   100         else:
       
   101             # *ring* Hi mom!
       
   102             return True
       
   103     
       
   104 
       
   105     def __get_process_event_args(self):
       
   106         return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
       
   107 
       
   108     def run(self):
       
   109         try:
       
   110             add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
       
   111             self.do_run()
       
   112         finally:
       
   113             add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
       
   114         
       
   115     def do_run(self):
       
   116         raise NotImplementedError()
       
   117 
       
   118 
       
   119 
       
   120 class SourceProcess(BaseProcess):
       
   121     
       
   122     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
       
   123         self.track = options.track
       
   124         self.token_filename = options.token_filename
       
   125         self.catchup = options.catchup
       
   126         self.timeout = options.timeout
       
   127         self.stream = None
       
   128         super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
       
   129                     
       
   130     def __source_stream_iter(self):
       
   131         
       
   132         self.logger = set_logging_process(self.options, self.logger_queue)
       
   133         self.logger.debug("SourceProcess : run ")
       
   134         
       
   135         self.auth = get_auth(self.options, self.access_token) 
       
   136         self.logger.debug("SourceProcess : auth set ")
       
   137         
       
   138         track_list = self.track  # or raw_input('Keywords to track (comma seperated): ').strip()
       
   139         self.logger.debug("SourceProcess : track list " + track_list)
       
   140         
       
   141         track_list = [k.strip() for k in track_list.split(',')]
       
   142 
       
   143         self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
       
   144         self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger)
       
   145         self.logger.debug("SourceProcess : after connecting to stream")
       
   146         self.stream.muststop = lambda: self.stop_event.is_set()        
       
   147         
       
   148         stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger)
       
   149         
       
   150         session = self.session_maker()
       
   151         
       
   152         try:
       
   153             for tweet in stream_wrapper:
       
   154                 if not self.parent_is_alive():
       
   155                     self.stop_event.set()
       
   156                     stop_thread.join(5)
       
   157                     sys.exit()
       
   158                 self.logger.debug("SourceProcess : tweet " + repr(tweet))
       
   159                 source = TweetSource(original_json=tweet)
       
   160                 self.logger.debug("SourceProcess : source created")
       
   161                 add_retries = 0
       
   162                 while add_retries < 10:
       
   163                     try:
       
   164                         add_retries += 1
       
   165                         session.add(source)
       
   166                         session.flush()
       
   167                         break
       
   168                     except OperationalError as e:
       
   169                         session.rollback()
       
   170                         self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
       
   171                         if add_retries == 10:
       
   172                             raise
       
   173                      
       
   174                 source_id = source.id
       
   175                 self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
       
   176                 self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
       
   177                 session.commit()
       
   178                 self.queue.put((source_id, tweet), False)
       
   179 
       
   180         except Exception as e:
       
   181             self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
       
   182             raise
       
   183         finally:
       
   184             session.rollback()
       
   185             session.close()
       
   186             self.logger_queue.close()
       
   187             self.queue.close()
       
   188             self.stream.close()
       
   189             self.stream = None
       
   190             if not self.stop_event.is_set():
       
   191                 self.stop_event.set()
       
   192 
       
   193 
       
   194     def do_run(self):
       
   195         
       
   196         # import pydevd
       
   197         # pydevd.settrace(suspend=False)
       
   198         
       
   199         source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
       
   200         
       
   201         source_stream_iter_thread.start()
       
   202         
       
   203         while not self.stop_event.is_set():
       
   204             self.logger.debug("SourceProcess : In while after start")
       
   205             self.stop_event.wait(DEFAULT_TIMEOUT)
       
   206             if self.stop_event.is_set() and self.stream:
       
   207                 self.stream.close()
       
   208             elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
       
   209                 self.stop_event.set()
       
   210 
       
   211         self.logger.info("SourceProcess : join")
       
   212         source_stream_iter_thread.join(30)
       
   213 
       
   214 
       
   215 def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger):
       
   216     try:
       
   217         if not tweet.strip():
       
   218             return
       
   219         tweet_obj = anyjson.deserialize(tweet)
       
   220         if 'text' not in tweet_obj:
       
   221             tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
       
   222             session.add(tweet_log)
       
   223             return
       
   224         screen_name = ""
       
   225         if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
       
   226             screen_name = tweet_obj['user']['screen_name']
       
   227         logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
       
   228         logger.debug(u"Process_tweet :" + repr(tweet))
       
   229         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user)
       
   230         processor.process()
       
   231     except ValueError as e:
       
   232         message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
       
   233         output = StringIO.StringIO()
       
   234         try:
       
   235             traceback.print_exc(file=output)
       
   236             error_stack = output.getvalue()
       
   237         finally:
       
   238             output.close()
       
   239         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
       
   240         session.add(tweet_log)
       
   241         session.commit()        
       
   242     except Exception as e:
       
   243         message = u"Error %s processing tweet %s" % (repr(e), tweet)
       
   244         logger.exception(message)
       
   245         output = StringIO.StringIO()
       
   246         try:
       
   247             traceback.print_exc(file=output)
       
   248             error_stack = output.getvalue()
       
   249         finally:
       
   250             output.close()
       
   251         session.rollback()
       
   252         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
       
   253         session.add(tweet_log)
       
   254         session.commit()
       
   255 
       
   256     
       
   257         
       
   258 class TweetProcess(BaseProcess):
       
   259     
       
   260     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
       
   261         super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
       
   262         self.twitter_query_user = options.twitter_query_user
       
   263 
       
   264 
       
   265     def do_run(self):
       
   266         
       
   267         self.logger = set_logging_process(self.options, self.logger_queue)
       
   268         session = self.session_maker()
       
   269         try:
       
   270             while not self.stop_event.is_set() and self.parent_is_alive():
       
   271                 try:
       
   272                     source_id, tweet_txt = self.queue.get(True, 3)
       
   273                     self.logger.debug("Processing source id " + repr(source_id))
       
   274                 except Exception as e:
       
   275                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
       
   276                     continue
       
   277                 process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger)
       
   278                 session.commit()
       
   279         finally:
       
   280             session.rollback()
       
   281             session.close()
       
   282 
       
   283 
       
   284 def get_sessionmaker(conn_str):
       
   285     engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
       
   286     Session = scoped_session(Session)
       
   287     return Session, engine, metadata
       
   288 
       
   289             
       
   290 def process_leftovers(session, access_token, twitter_query_user, logger):
       
   291     
       
   292     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
       
   293     
       
   294     for src in sources:
       
   295         tweet_txt = src.original_json
       
   296         process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger)
       
   297         session.commit()
       
   298 
       
   299         
       
   300     
       
   301     # get tweet source that do not match any message
       
   302     # select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
       
   303 def process_log(logger_queues, stop_event):
       
   304     while not stop_event.is_set():
       
   305         for lqueue in logger_queues:
       
   306             try:
       
   307                 record = lqueue.get_nowait()
       
   308                 logging.getLogger(record.name).handle(record)
       
   309             except Queue.Empty:
       
   310                 continue
       
   311             except IOError:
       
   312                 continue
       
   313         time.sleep(0.1)
       
   314 
       
   315         
       
   316 def get_options():
       
   317 
       
   318     usage = "usage: %prog [options]"
       
   319 
       
   320     parser = OptionParser(usage=usage)
       
   321 
       
   322     parser.add_option("-f", "--file", dest="conn_str",
       
   323                       help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
       
   324     parser.add_option("-u", "--user", dest="username",
       
   325                       help="Twitter user", metavar="USER", default=None)
       
   326     parser.add_option("-w", "--password", dest="password",
       
   327                       help="Twitter password", metavar="PASSWORD", default=None)
       
   328     parser.add_option("-T", "--track", dest="track",
       
   329                       help="Twitter track", metavar="TRACK")
       
   330     parser.add_option("-n", "--new", dest="new", action="store_true",
       
   331                       help="new database", default=False)
       
   332     parser.add_option("-D", "--daemon", dest="daemon", action="store_true",
       
   333                       help="launch daemon", default=False)
       
   334     parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
       
   335                       help="Token file name")
       
   336     parser.add_option("-d", "--duration", dest="duration",
       
   337                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
       
   338     parser.add_option("-N", "--nb-process", dest="process_nb",
       
   339                       help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
       
   340     parser.add_option("--url", dest="url",
       
   341                       help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url)
       
   342     parser.add_option("--query-user", dest="twitter_query_user", action="store_true",
       
   343                       help="Query twitter for users", default=False, metavar="QUERY_USER")
       
   344     parser.add_option("--catchup", dest="catchup",
       
   345                       help="catchup count for tweets", default=None, metavar="CATCHUP", type='int')
       
   346     parser.add_option("--timeout", dest="timeout",
       
   347                       help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int')
       
   348     
       
   349 
       
   350 
       
   351 
       
   352     utils.set_logging_options(parser)
       
   353 
       
   354     return parser.parse_args()
       
   355 
       
   356 
       
   357 def do_run(options, session_maker):
       
   358 
       
   359     stop_args = {}
       
   360 
       
   361     access_token = None
       
   362     if not options.username or not options.password:
       
   363         access_token = utils.get_oauth_token(options.token_filename)
       
   364     
       
   365     session = session_maker()
       
   366     try:
       
   367         process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
       
   368         session.commit()
       
   369     finally:
       
   370         session.rollback()
       
   371         session.close()
       
   372     
       
   373     if options.process_nb <= 0:
       
   374         utils.get_logger().debug("Leftovers processed. Exiting.")
       
   375         return None
       
   376 
       
   377     queue = mQueue()
       
   378     stop_event = Event()
       
   379     
       
   380     # workaround for bug on using urllib2 and multiprocessing
       
   381     req = urllib2.Request('http://localhost')
       
   382     conn = None
       
   383     try:
       
   384         conn = urllib2.urlopen(req)
       
   385     except:
       
   386         utils.get_logger().debug("could not open localhost")
       
   387         # donothing
       
   388     finally:
       
   389         if conn is not None:
       
   390             conn.close()
       
   391     
       
   392     process_engines = []
       
   393     logger_queues = []
       
   394     
       
   395     SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
       
   396     process_engines.append(engine_process)
       
   397     lqueue = mQueue(50)
       
   398     logger_queues.append(lqueue)
       
   399     pid = os.getpid()
       
   400     sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
       
   401     
       
   402     tweet_processes = []
       
   403     
       
   404     for i in range(options.process_nb - 1):
       
   405         SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
       
   406         process_engines.append(engine_process)
       
   407         lqueue = mQueue(50)
       
   408         logger_queues.append(lqueue)
       
   409         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
       
   410         tweet_processes.append(cprocess)
       
   411 
       
   412     log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
       
   413     log_thread.daemon = True
       
   414 
       
   415     log_thread.start()
       
   416 
       
   417     sprocess.start()
       
   418     for cprocess in tweet_processes:
       
   419         cprocess.start()
       
   420 
       
   421     add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
       
   422 
       
   423     if options.duration >= 0:
       
   424         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
       
   425 
       
   426     def interupt_handler(signum, frame):
       
   427         utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
       
   428         stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
       
   429         stop_event.set()
       
   430         
       
   431     signal.signal(signal.SIGINT , interupt_handler)
       
   432     signal.signal(signal.SIGHUP , interupt_handler)
       
   433     signal.signal(signal.SIGALRM, interupt_handler)
       
   434     signal.signal(signal.SIGTERM, interupt_handler)
       
   435     
       
   436 
       
   437     while not stop_event.is_set():
       
   438         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
       
   439             stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
       
   440             stop_event.set()
       
   441             break
       
   442         if sprocess.is_alive():
       
   443             utils.get_logger().debug("Source process alive")
       
   444             time.sleep(1)
       
   445         else:
       
   446             stop_args.update({'message': 'Source process killed'})
       
   447             stop_event.set()
       
   448             break
       
   449     utils.get_logger().debug("Joining Source Process")
       
   450     try:
       
   451         sprocess.join(10)
       
   452     except:
       
   453         utils.get_logger().debug("Pb joining Source Process - terminating")
       
   454         sprocess.terminate()
       
   455         
       
   456     for i, cprocess in enumerate(tweet_processes):
       
   457         utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
       
   458         try:
       
   459             cprocess.join(3)
       
   460         except:
       
   461             utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
       
   462             cprocess.terminate()
       
   463 
       
   464     
       
   465     utils.get_logger().debug("Close queues")
       
   466     try:
       
   467         queue.close()
       
   468         for lqueue in logger_queues:
       
   469             lqueue.close()
       
   470     except exception as e:
       
   471         utils.get_logger().error("error when closing queues %s", repr(e))
       
   472         # do nothing
       
   473         
       
   474     
       
   475     if options.process_nb > 1:
       
   476         utils.get_logger().debug("Processing leftovers")
       
   477         session = session_maker()
       
   478         try:
       
   479             process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
       
   480             session.commit()
       
   481         finally:
       
   482             session.rollback()
       
   483             session.close()
       
   484 
       
   485     for pengine in process_engines:
       
   486         pengine.dispose()
       
   487 
       
   488     return stop_args
       
   489 
       
   490 
       
   491 def main(options, args):
       
   492     
       
   493     global conn_str
       
   494     
       
   495     conn_str = options.conn_str.strip()
       
   496     if not re.match("^\w+://.+", conn_str):
       
   497         conn_str = 'sqlite:///' + options.conn_str
       
   498         
       
   499     if conn_str.startswith("sqlite") and options.new:
       
   500         filepath = conn_str[conn_str.find(":///") + 4:]
       
   501         if os.path.exists(filepath):
       
   502             i = 1
       
   503             basename, extension = os.path.splitext(filepath)
       
   504             new_path = '%s.%d%s' % (basename, i, extension)
       
   505             while i < 1000000 and os.path.exists(new_path):
       
   506                 i += 1
       
   507                 new_path = '%s.%d%s' % (basename, i, extension)
       
   508             if i >= 1000000:
       
   509                 raise Exception("Unable to find new filename for " + filepath)
       
   510             else:
       
   511                 shutil.move(filepath, new_path)
       
   512 
       
   513     Session, engine, metadata = get_sessionmaker(conn_str)
       
   514     
       
   515     if options.new:
       
   516         check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
       
   517         if len(check_metadata.sorted_tables) > 0:
       
   518             message = "Database %s not empty exiting" % conn_str
       
   519             utils.get_logger().error(message)
       
   520             sys.exit(message)
       
   521     
       
   522     metadata.create_all(engine)
       
   523     session = Session()
       
   524     try:
       
   525         models.add_model_version(session)
       
   526     finally:
       
   527         session.close()
       
   528     
       
   529     stop_args = {}
       
   530     try:
       
   531         add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
       
   532         stop_args = do_run(options, Session)
       
   533     except Exception as e:
       
   534         utils.get_logger().exception("Error in main thread")        
       
   535         outfile = StringIO.StringIO()
       
   536         try:
       
   537             traceback.print_exc(file=outfile)
       
   538             stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
       
   539         finally:
       
   540             outfile.close()
       
   541         raise
       
   542     finally:    
       
   543         add_process_event(type="shutdown", args=stop_args, session_maker=Session)
       
   544 
       
   545     utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
       
   546 
       
   547 
       
   548 
       
   549 if __name__ == '__main__':
       
   550 
       
   551     (options, args) = get_options()
       
   552     
       
   553     loggers = set_logging(options)
       
   554     
       
   555     utils.get_logger().debug("OPTIONS : " + repr(options))
       
   556     
       
   557     if options.daemon:
       
   558         import daemon
       
   559         import lockfile
       
   560         
       
   561         hdlr_preserve = []
       
   562         for logger in loggers:
       
   563             hdlr_preserve.extend([h.stream for h in logger.handlers])
       
   564             
       
   565         context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) 
       
   566         with context:
       
   567             main(options, args)
       
   568     else:
       
   569         main(options, args)
       
   570