script/stream/recorder_stream.py
changeset 893 10a19dd4e1c9
parent 528 7fb5a7b0d35c
parent 890 9c57883dbb9d
child 919 e126d3e1e186
equal deleted inserted replaced
877:41ce1c341abe 893:10a19dd4e1c9
       
     1 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
       
     2 from iri_tweet import models, utils
       
     3 from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
       
     4 from iri_tweet.processor import get_processor
       
     5 from multiprocessing import Queue as mQueue, Process, Event
       
     6 from sqlalchemy.exc import OperationalError
       
     7 from sqlalchemy.orm import scoped_session
       
     8 import Queue
       
     9 import StringIO
       
    10 import anyjson
       
    11 import argparse
       
    12 import datetime
       
    13 import inspect
       
    14 import iri_tweet.stream
       
    15 import logging
       
    16 import os
       
    17 import re
       
    18 import requests_oauthlib
       
    19 import shutil
       
    20 import signal
       
    21 import socket
       
    22 import sqlalchemy.schema
       
    23 import sys
       
    24 import thread
       
    25 import threading
       
    26 import time
       
    27 import traceback
       
    28 import urllib2
       
    29 socket._fileobject.default_bufsize = 0
       
    30 
       
    31 
       
    32 
       
    33 # columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
       
    34 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
       
    35 # columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
       
    36 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
       
    37 # just put it in a sqlite3 tqble
       
    38 
       
    39 DEFAULT_TIMEOUT = 3
       
    40 
       
    41 class Requesthandler(BaseHTTPRequestHandler):
       
    42 
       
    43     def __init__(self, request, client_address, server):
       
    44         BaseHTTPRequestHandler.__init__(self, request, client_address, server)
       
    45         
       
    46     def do_GET(self):
       
    47         self.send_response(200)
       
    48         self.end_headers()
       
    49     
       
    50     def log_message(self, format, *args):        # @ReservedAssignment
       
    51         pass
       
    52 
       
    53 
       
    54 def set_logging(options):
       
    55     loggers = []
       
    56     
       
    57     loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
       
    58     loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing')))
       
    59     if options.debug >= 2:
       
    60         loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine')))
       
    61     # utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
       
    62     # utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
       
    63     # utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
       
    64     return loggers
       
    65 
       
    66 def set_logging_process(options, queue):
       
    67     qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
       
    68     qlogger.propagate = 0
       
    69     return qlogger
       
    70 
       
    71 def get_auth(options, access_token):
       
    72     consumer_key = options.consumer_key
       
    73     consumer_secret = options.consumer_secret
       
    74     auth = requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=access_token[0], resource_owner_secret=access_token[1], signature_type='auth_header')
       
    75     return auth
       
    76 
       
    77 
       
    78 def add_process_event(event_type, args, session_maker):
       
    79     session = session_maker()
       
    80     try:
       
    81         evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=event_type)
       
    82         session.add(evt)
       
    83         session.commit()
       
    84     finally:
       
    85         session.close()
       
    86 
       
    87 
       
    88 class BaseProcess(Process):
       
    89 
       
    90     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
       
    91         self.parent_pid = parent_pid
       
    92         self.session_maker = session_maker
       
    93         self.queue = queue
       
    94         self.options = options
       
    95         self.logger_queue = logger_queue
       
    96         self.stop_event = stop_event
       
    97         self.consumer_token = (options.consumer_key, options.consumer_secret)
       
    98         self.access_token = access_token
       
    99 
       
   100         super(BaseProcess, self).__init__()
       
   101 
       
   102     #
       
   103     # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
       
   104     #
       
   105     def parent_is_alive(self):
       
   106         try:
       
   107             # try to call Parent
       
   108             os.kill(self.parent_pid, 0)
       
   109         except OSError:
       
   110             # *beeep* oh no! The phone's disconnected!
       
   111             return False
       
   112         else:
       
   113             # *ring* Hi mom!
       
   114             return True
       
   115     
       
   116 
       
   117     def __get_process_event_args(self):
       
   118         return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
       
   119 
       
   120     def run(self):
       
   121         try:
       
   122             add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
       
   123             self.do_run()
       
   124         finally:
       
   125             add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
       
   126         
       
   127     def do_run(self):
       
   128         raise NotImplementedError()
       
   129 
       
   130 
       
   131 
       
   132 class SourceProcess(BaseProcess):
       
   133     
       
   134     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
       
   135         self.track = options.track
       
   136         self.token_filename = options.token_filename
       
   137         self.timeout = options.timeout
       
   138         self.stream = None
       
   139         super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
       
   140                     
       
   141     def __source_stream_iter(self):
       
   142                 
       
   143         self.logger.debug("SourceProcess : run ")
       
   144         
       
   145         self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.access_token))
       
   146         self.auth = get_auth(self.options, self.access_token) 
       
   147         self.logger.debug("SourceProcess : auth set ")
       
   148         
       
   149         track_list = self.track  # or raw_input('Keywords to track (comma seperated): ').strip()
       
   150         self.logger.debug("SourceProcess : track list " + track_list)
       
   151         
       
   152         track_list = [k.strip() for k in track_list.split(',')]
       
   153 
       
   154         self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth)))                        
       
   155         self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, chunk_size=512, logger=self.logger)
       
   156         self.logger.debug("SourceProcess : after connecting to stream")
       
   157         self.stream.muststop = lambda: self.stop_event.is_set()        
       
   158         
       
   159         stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger)
       
   160         
       
   161         session = self.session_maker()
       
   162         
       
   163         #import pydevd
       
   164         #pydevd.settrace(suspend=False)
       
   165 
       
   166         
       
   167         try:
       
   168             for tweet in stream_wrapper:
       
   169                 if not self.parent_is_alive():
       
   170                     self.stop_event.set()
       
   171                     sys.exit()
       
   172                 self.logger.debug("SourceProcess : tweet " + repr(tweet))
       
   173                 source = TweetSource(original_json=tweet)
       
   174                 self.logger.debug("SourceProcess : source created")
       
   175                 add_retries = 0
       
   176                 while add_retries < 10:
       
   177                     try:
       
   178                         add_retries += 1
       
   179                         session.add(source)
       
   180                         session.flush()
       
   181                         break
       
   182                     except OperationalError as e:
       
   183                         session.rollback()
       
   184                         self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
       
   185                         if add_retries == 10:
       
   186                             raise
       
   187                      
       
   188                 source_id = source.id
       
   189                 self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
       
   190                 self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
       
   191                 session.commit()
       
   192                 self.queue.put((source_id, tweet), False)
       
   193 
       
   194         except Exception as e:
       
   195             self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
       
   196             raise
       
   197         finally:
       
   198             session.rollback()
       
   199             session.close()
       
   200             self.stream.close()
       
   201             self.stream = None
       
   202             if not self.stop_event.is_set():
       
   203                 self.stop_event.set()
       
   204 
       
   205 
       
   206     def do_run(self):
       
   207         
       
   208         self.logger = set_logging_process(self.options, self.logger_queue)                
       
   209         
       
   210         source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
       
   211         
       
   212         source_stream_iter_thread.start()
       
   213         
       
   214         try:
       
   215             while not self.stop_event.is_set():
       
   216                 self.logger.debug("SourceProcess : In while after start")
       
   217                 self.stop_event.wait(DEFAULT_TIMEOUT)
       
   218         except KeyboardInterrupt:
       
   219             self.stop_event.set()
       
   220             pass
       
   221 
       
   222         if self.stop_event.is_set() and self.stream:
       
   223             self.stream.close()
       
   224         elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
       
   225             self.stop_event.set()
       
   226 
       
   227         self.queue.cancel_join_thread()
       
   228         self.logger_queue.cancel_join_thread()
       
   229         self.logger.info("SourceProcess : join")
       
   230         source_stream_iter_thread.join(30)
       
   231 
       
   232 
       
   233 def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger):
       
   234     try:
       
   235         if not tweet.strip():
       
   236             return
       
   237         tweet_obj = anyjson.deserialize(tweet)
       
   238         processor_klass = get_processor(tweet_obj)
       
   239         if not processor_klass:
       
   240             tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
       
   241             session.add(tweet_log)
       
   242             return
       
   243         processor = processor_klass(json_dict=tweet_obj,
       
   244                                     json_txt=tweet,
       
   245                                     source_id=source_id,
       
   246                                     session=session,
       
   247                                     consumer_token=consumer_token,
       
   248                                     access_token=access_token,
       
   249                                     token_filename=token_filename,
       
   250                                     user_query_twitter=twitter_query_user,
       
   251                                     logger=logger)
       
   252         logger.info(processor.log_info())                        
       
   253         logger.debug(u"Process_tweet :" + repr(tweet))                
       
   254         processor.process()
       
   255         
       
   256     except ValueError as e:
       
   257         message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
       
   258         output = StringIO.StringIO()
       
   259         try:
       
   260             traceback.print_exc(file=output)
       
   261             error_stack = output.getvalue()
       
   262         finally:
       
   263             output.close()
       
   264         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
       
   265         session.add(tweet_log)
       
   266         session.commit()        
       
   267     except Exception as e:
       
   268         message = u"Error %s processing tweet %s" % (repr(e), tweet)
       
   269         logger.exception(message)
       
   270         output = StringIO.StringIO()
       
   271         try:
       
   272             traceback.print_exc(file=output)
       
   273             error_stack = output.getvalue()
       
   274         finally:
       
   275             output.close()
       
   276         session.rollback()
       
   277         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
       
   278         session.add(tweet_log)
       
   279         session.commit()
       
   280 
       
   281     
       
   282         
       
   283 class TweetProcess(BaseProcess):
       
   284     
       
   285     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
       
   286         super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
       
   287         self.twitter_query_user = options.twitter_query_user
       
   288 
       
   289 
       
   290     def do_run(self):
       
   291         
       
   292         self.logger = set_logging_process(self.options, self.logger_queue)
       
   293         session = self.session_maker()
       
   294         try:
       
   295             while not self.stop_event.is_set() and self.parent_is_alive():
       
   296                 try:
       
   297                     source_id, tweet_txt = self.queue.get(True, 3)
       
   298                     self.logger.debug("Processing source id " + repr(source_id))
       
   299                 except Exception as e:
       
   300                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
       
   301                     continue
       
   302                 process_tweet(tweet_txt, source_id, session, self.consumer_token, self.access_token, self.twitter_query_user, self.options.token_filename, self.logger)
       
   303                 session.commit()
       
   304         except KeyboardInterrupt:
       
   305             self.stop_event.set()
       
   306         finally:
       
   307             session.rollback()
       
   308             session.close()
       
   309 
       
   310 
       
   311 def get_sessionmaker(conn_str):
       
   312     engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
       
   313     Session = scoped_session(Session)
       
   314     return Session, engine, metadata
       
   315 
       
   316             
       
   317 def process_leftovers(session, consumer_token, access_token, twitter_query_user, token_filename, ask_process_leftovers, logger):
       
   318     
       
   319     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
       
   320     sources_count = sources.count()
       
   321     
       
   322     if sources_count > 10 and ask_process_leftovers:
       
   323         resp = raw_input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count)
       
   324         if resp and resp.strip().lower() == "n":
       
   325             return
       
   326     logger.info("Process leftovers, %d tweets to process" % (sources_count))
       
   327     for src in sources:
       
   328         tweet_txt = src.original_json
       
   329         process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger)
       
   330         session.commit()
       
   331         
       
   332     
       
   333 def process_log(logger_queues, stop_event):
       
   334     while not stop_event.is_set():
       
   335         for lqueue in logger_queues:
       
   336             try:
       
   337                 record = lqueue.get_nowait()
       
   338                 logging.getLogger(record.name).handle(record)
       
   339             except Queue.Empty:
       
   340                 continue
       
   341             except IOError:
       
   342                 continue
       
   343         time.sleep(0.1)
       
   344 
       
   345         
       
   346 def get_options():
       
   347 
       
   348     usage = "usage: %(prog)s [options]"
       
   349 
       
   350     parser = argparse.ArgumentParser(usage=usage)
       
   351 
       
   352     parser.add_argument("-f", "--file", dest="conn_str",
       
   353                         help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
       
   354     parser.add_argument("-T", "--track", dest="track",
       
   355                         help="Twitter track", metavar="TRACK")
       
   356     parser.add_argument("-k", "--key", dest="consumer_key",
       
   357                         help="Twitter consumer key", metavar="CONSUMER_KEY", required=True)
       
   358     parser.add_argument("-s", "--secret", dest="consumer_secret",
       
   359                         help="Twitter consumer secret", metavar="CONSUMER_SECRET", required=True)
       
   360     parser.add_argument("-n", "--new", dest="new", action="store_true",
       
   361                         help="new database", default=False)
       
   362     parser.add_argument("-D", "--daemon", dest="daemon", action="store_true",
       
   363                         help="launch daemon", default=False)
       
   364     parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
       
   365                         help="Token file name")
       
   366     parser.add_argument("-d", "--duration", dest="duration",
       
   367                         help="Duration of recording in seconds", metavar="DURATION", default= -1, type=int)
       
   368     parser.add_argument("-N", "--nb-process", dest="process_nb",
       
   369                         help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type=int)
       
   370     parser.add_argument("--url", dest="url",
       
   371                         help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url)
       
   372     parser.add_argument("--query-user", dest="twitter_query_user", action="store_true",
       
   373                         help="Query twitter for users", default=False)
       
   374     parser.add_argument("--timeout", dest="timeout",
       
   375                         help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type=int)
       
   376     parser.add_argument("--ask-process-leftovers", dest="ask_process_leftovers", action="store_false",
       
   377                         help="ask process leftover", default=True)
       
   378 
       
   379 
       
   380     utils.set_logging_options(parser)
       
   381 
       
   382     return parser.parse_args()
       
   383 
       
   384 
       
   385 def do_run(options, session_maker):
       
   386 
       
   387     stop_args = {}
       
   388     
       
   389     consumer_token = (options.consumer_key, options.consumer_secret)
       
   390     access_token = utils.get_oauth_token(consumer_key=consumer_token[0], consumer_secret=consumer_token[1], token_file_path=options.token_filename)
       
   391     
       
   392     
       
   393     session = session_maker()
       
   394     try:
       
   395         process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
       
   396         session.commit()
       
   397     finally:
       
   398         session.rollback()
       
   399         session.close()
       
   400     
       
   401     if options.process_nb <= 0:
       
   402         utils.get_logger().debug("Leftovers processed. Exiting.")
       
   403         return None
       
   404 
       
   405     queue = mQueue()
       
   406     stop_event = Event()
       
   407     
       
   408     # workaround for bug on using urllib2 and multiprocessing
       
   409     httpd = HTTPServer(('127.0.0.1',0), Requesthandler)
       
   410     thread.start_new_thread(httpd.handle_request, ())
       
   411     
       
   412     req = urllib2.Request('http://localhost:%d' % httpd.server_port)
       
   413     conn = None
       
   414     try:
       
   415         conn = urllib2.urlopen(req)
       
   416     except:
       
   417         utils.get_logger().debug("could not open localhost")
       
   418         # donothing
       
   419     finally:
       
   420         if conn is not None:
       
   421             conn.close()
       
   422     
       
   423     process_engines = []
       
   424     logger_queues = []
       
   425     
       
   426     SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
       
   427     process_engines.append(engine_process)
       
   428     lqueue = mQueue(50)
       
   429     logger_queues.append(lqueue)
       
   430     pid = os.getpid()
       
   431     sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
       
   432     
       
   433     tweet_processes = []
       
   434     
       
   435     for i in range(options.process_nb - 1):
       
   436         SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
       
   437         process_engines.append(engine_process)
       
   438         lqueue = mQueue(50)
       
   439         logger_queues.append(lqueue)
       
   440         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
       
   441         tweet_processes.append(cprocess)
       
   442 
       
   443     log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
       
   444     log_thread.daemon = True
       
   445 
       
   446     log_thread.start()
       
   447 
       
   448     sprocess.start()
       
   449     for cprocess in tweet_processes:
       
   450         cprocess.start()
       
   451 
       
   452     add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
       
   453 
       
   454     if options.duration >= 0:
       
   455         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
       
   456 
       
   457     def interupt_handler(signum, frame):
       
   458         utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
       
   459         stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
       
   460         stop_event.set()
       
   461         
       
   462     signal.signal(signal.SIGINT , interupt_handler)
       
   463     signal.signal(signal.SIGHUP , interupt_handler)
       
   464     signal.signal(signal.SIGALRM, interupt_handler)
       
   465     signal.signal(signal.SIGTERM, interupt_handler)
       
   466     
       
   467 
       
   468     while not stop_event.is_set():
       
   469         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
       
   470             stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
       
   471             stop_event.set()
       
   472             break
       
   473         if sprocess.is_alive():
       
   474             utils.get_logger().debug("Source process alive")
       
   475             time.sleep(1)
       
   476         else:
       
   477             stop_args.update({'message': 'Source process killed'})
       
   478             stop_event.set()
       
   479             break
       
   480     utils.get_logger().debug("Joining Source Process")
       
   481     try:
       
   482         sprocess.join(10)
       
   483     except:
       
   484         utils.get_logger().debug("Pb joining Source Process - terminating")
       
   485     finally:
       
   486         sprocess.terminate()
       
   487         
       
   488     for i, cprocess in enumerate(tweet_processes):
       
   489         utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
       
   490         try:
       
   491             cprocess.join(3)
       
   492         except:
       
   493             utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
       
   494             cprocess.terminate()
       
   495 
       
   496     
       
   497     utils.get_logger().debug("Close queues")
       
   498     try:
       
   499         queue.close()
       
   500         for lqueue in logger_queues:
       
   501             lqueue.close()
       
   502     except Exception as e:
       
   503         utils.get_logger().error("error when closing queues %s", repr(e))
       
   504         # do nothing
       
   505         
       
   506     
       
   507     if options.process_nb > 1:
       
   508         utils.get_logger().debug("Processing leftovers")
       
   509         session = session_maker()
       
   510         try:
       
   511             process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
       
   512             session.commit()
       
   513         finally:
       
   514             session.rollback()
       
   515             session.close()
       
   516 
       
   517     for pengine in process_engines:
       
   518         pengine.dispose()
       
   519     
       
   520     return stop_args
       
   521 
       
   522 
       
   523 def main(options):
       
   524     
       
   525     global conn_str
       
   526     
       
   527     conn_str = options.conn_str.strip()
       
   528     if not re.match("^\w+://.+", conn_str):
       
   529         conn_str = 'sqlite:///' + options.conn_str
       
   530         
       
   531     if conn_str.startswith("sqlite") and options.new:
       
   532         filepath = conn_str[conn_str.find(":///") + 4:]
       
   533         if os.path.exists(filepath):
       
   534             i = 1
       
   535             basename, extension = os.path.splitext(filepath)
       
   536             new_path = '%s.%d%s' % (basename, i, extension)
       
   537             while i < 1000000 and os.path.exists(new_path):
       
   538                 i += 1
       
   539                 new_path = '%s.%d%s' % (basename, i, extension)
       
   540             if i >= 1000000:
       
   541                 raise Exception("Unable to find new filename for " + filepath)
       
   542             else:
       
   543                 shutil.move(filepath, new_path)
       
   544 
       
   545     Session, engine, metadata = get_sessionmaker(conn_str)
       
   546     
       
   547     if options.new:
       
   548         check_metadata = sqlalchemy.schema.MetaData(bind=engine)
       
   549         check_metadata.reflect()
       
   550         if len(check_metadata.sorted_tables) > 0:
       
   551             message = "Database %s not empty exiting" % conn_str
       
   552             utils.get_logger().error(message)
       
   553             sys.exit(message)
       
   554     
       
   555     metadata.create_all(engine)
       
   556     session = Session()
       
   557     try:
       
   558         models.add_model_version(session)
       
   559     finally:
       
   560         session.close()
       
   561     
       
   562     stop_args = {}
       
   563     try:
       
   564         add_process_event(event_type="start", args={'options':options.__dict__, 'args': [], 'command_line': sys.argv}, session_maker=Session)
       
   565         stop_args = do_run(options, Session)
       
   566     except Exception as e:
       
   567         utils.get_logger().exception("Error in main thread")        
       
   568         outfile = StringIO.StringIO()
       
   569         try:
       
   570             traceback.print_exc(file=outfile)
       
   571             stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
       
   572         finally:
       
   573             outfile.close()
       
   574         raise
       
   575     finally:    
       
   576         add_process_event(event_type="shutdown", args=stop_args, session_maker=Session)
       
   577 
       
   578     utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
       
   579 
       
   580 
       
   581 
       
   582 if __name__ == '__main__':
       
   583 
       
   584     options = get_options()
       
   585     
       
   586     loggers = set_logging(options)
       
   587     
       
   588     utils.get_logger().debug("OPTIONS : " + repr(options))
       
   589     
       
   590     if options.daemon:
       
   591         options.ask_process_leftovers = False
       
   592         import daemon
       
   593         
       
   594         hdlr_preserve = []
       
   595         for logger in loggers:
       
   596             hdlr_preserve.extend([h.stream for h in logger.handlers])
       
   597             
       
   598         context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) 
       
   599         with context:
       
   600             main(options)
       
   601     else:
       
   602         main(options)
       
   603