script/stream/recorder_stream.py
changeset 888 6fc6637d8403
parent 886 1e110b03ae96
child 890 9c57883dbb9d
equal deleted inserted replaced
887:503f9a7b7d6c 888:6fc6637d8403
     1 from getpass import getpass
     1 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
     2 from iri_tweet import models, utils
     2 from iri_tweet import models, utils
     3 from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
     3 from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
     4 from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
     4 from iri_tweet.processor import get_processor
     5     get_logger)
     5 from multiprocessing import Queue as mQueue, Process, Event
     6 from optparse import OptionParser
       
     7 from sqlalchemy.exc import OperationalError
     6 from sqlalchemy.exc import OperationalError
     8 from sqlalchemy.orm import scoped_session
     7 from sqlalchemy.orm import scoped_session
     9 import Queue
     8 import Queue
    10 import StringIO
     9 import StringIO
    11 import anyjson
    10 import anyjson
       
    11 import argparse
    12 import datetime
    12 import datetime
    13 import inspect
    13 import inspect
    14 import iri_tweet.stream
    14 import iri_tweet.stream
    15 import logging
    15 import logging
    16 import os
    16 import os
    19 import shutil
    19 import shutil
    20 import signal
    20 import signal
    21 import socket
    21 import socket
    22 import sqlalchemy.schema
    22 import sqlalchemy.schema
    23 import sys
    23 import sys
       
    24 import thread
    24 import threading
    25 import threading
    25 import time
    26 import time
    26 import traceback
    27 import traceback
    27 import urllib2
    28 import urllib2
    28 socket._fileobject.default_bufsize = 0
    29 socket._fileobject.default_bufsize = 0
    33 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
    34 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
    34 # columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
    35 # columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
    35 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
    36 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
    36 # just put it in a sqlite3 tqble
    37 # just put it in a sqlite3 tqble
    37 
    38 
    38 DEFAULT_TIMEOUT = 5
    39 DEFAULT_TIMEOUT = 3
       
    40 
       
    41 class Requesthandler(BaseHTTPRequestHandler):
       
    42 
       
    43     def __init__(self, request, client_address, server):
       
    44         BaseHTTPRequestHandler.__init__(self, request, client_address, server)
       
    45         
       
    46     def do_GET(self):
       
    47         self.send_response(200)
       
    48         self.end_headers()
       
    49     
       
    50     def log_message(self, format, *args):        # @ReservedAssignment
       
    51         pass
       
    52 
    39 
    53 
    40 def set_logging(options):
    54 def set_logging(options):
    41     loggers = []
    55     loggers = []
    42     
    56     
    43     loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
    57     loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
    53     qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
    67     qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
    54     qlogger.propagate = 0
    68     qlogger.propagate = 0
    55     return qlogger
    69     return qlogger
    56 
    70 
    57 def get_auth(options, access_token):
    71 def get_auth(options, access_token):
    58     if options.username and options.password:
    72     consumer_key = options.consumer_key
    59         auth = requests.auth.BasicAuthHandler(options.username, options.password)        
    73     consumer_secret = options.consumer_secret
    60     else:
    74     auth = requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=access_token[0], resource_owner_secret=access_token[1], signature_type='auth_header')
    61         consumer_key = models.CONSUMER_KEY
       
    62         consumer_secret = models.CONSUMER_SECRET
       
    63         auth = requests_oauthlib.OAuth1(access_token[0], access_token[1], consumer_key, consumer_secret, signature_type='auth_header')
       
    64     return auth
    75     return auth
    65 
    76 
    66 
    77 
    67 def add_process_event(type, args, session_maker):
    78 def add_process_event(event_type, args, session_maker):
    68     session = session_maker()
    79     session = session_maker()
    69     try:
    80     try:
    70         evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
    81         evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=event_type)
    71         session.add(evt)
    82         session.add(evt)
    72         session.commit()
    83         session.commit()
    73     finally:
    84     finally:
    74         session.close()
    85         session.close()
    75 
    86 
    81         self.session_maker = session_maker
    92         self.session_maker = session_maker
    82         self.queue = queue
    93         self.queue = queue
    83         self.options = options
    94         self.options = options
    84         self.logger_queue = logger_queue
    95         self.logger_queue = logger_queue
    85         self.stop_event = stop_event
    96         self.stop_event = stop_event
       
    97         self.consumer_token = (options.consumer_key, options.consumer_secret)
    86         self.access_token = access_token
    98         self.access_token = access_token
    87 
    99 
    88         super(BaseProcess, self).__init__()
   100         super(BaseProcess, self).__init__()
    89 
   101 
    90     #
   102     #
   120 class SourceProcess(BaseProcess):
   132 class SourceProcess(BaseProcess):
   121     
   133     
   122     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
   134     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
   123         self.track = options.track
   135         self.track = options.track
   124         self.token_filename = options.token_filename
   136         self.token_filename = options.token_filename
   125         self.catchup = options.catchup
       
   126         self.timeout = options.timeout
   137         self.timeout = options.timeout
   127         self.stream = None
   138         self.stream = None
   128         super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
   139         super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
   129                     
   140                     
   130     def __source_stream_iter(self):
   141     def __source_stream_iter(self):
   131         
   142                 
   132         self.logger = set_logging_process(self.options, self.logger_queue)
       
   133         self.logger.debug("SourceProcess : run ")
   143         self.logger.debug("SourceProcess : run ")
   134         
   144         
       
   145         self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.access_token))
   135         self.auth = get_auth(self.options, self.access_token) 
   146         self.auth = get_auth(self.options, self.access_token) 
   136         self.logger.debug("SourceProcess : auth set ")
   147         self.logger.debug("SourceProcess : auth set ")
   137         
   148         
   138         track_list = self.track  # or raw_input('Keywords to track (comma seperated): ').strip()
   149         track_list = self.track  # or raw_input('Keywords to track (comma seperated): ').strip()
   139         self.logger.debug("SourceProcess : track list " + track_list)
   150         self.logger.debug("SourceProcess : track list " + track_list)
   140         
   151         
   141         track_list = [k.strip() for k in track_list.split(',')]
   152         track_list = [k.strip() for k in track_list.split(',')]
   142 
   153 
   143         self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
   154         self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth)))                        
   144         self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger)
   155         self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, chunk_size=512, logger=self.logger)
   145         self.logger.debug("SourceProcess : after connecting to stream")
   156         self.logger.debug("SourceProcess : after connecting to stream")
   146         self.stream.muststop = lambda: self.stop_event.is_set()        
   157         self.stream.muststop = lambda: self.stop_event.is_set()        
   147         
   158         
   148         stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger)
   159         stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger)
   149         
   160         
   150         session = self.session_maker()
   161         session = self.session_maker()
       
   162         
       
   163         #import pydevd
       
   164         #pydevd.settrace(suspend=False)
       
   165 
   151         
   166         
   152         try:
   167         try:
   153             for tweet in stream_wrapper:
   168             for tweet in stream_wrapper:
   154                 if not self.parent_is_alive():
   169                 if not self.parent_is_alive():
   155                     self.stop_event.set()
   170                     self.stop_event.set()
   156                     stop_thread.join(5)
       
   157                     sys.exit()
   171                     sys.exit()
   158                 self.logger.debug("SourceProcess : tweet " + repr(tweet))
   172                 self.logger.debug("SourceProcess : tweet " + repr(tweet))
   159                 source = TweetSource(original_json=tweet)
   173                 source = TweetSource(original_json=tweet)
   160                 self.logger.debug("SourceProcess : source created")
   174                 self.logger.debug("SourceProcess : source created")
   161                 add_retries = 0
   175                 add_retries = 0
   191                 self.stop_event.set()
   205                 self.stop_event.set()
   192 
   206 
   193 
   207 
   194     def do_run(self):
   208     def do_run(self):
   195         
   209         
   196         # import pydevd
   210         self.logger = set_logging_process(self.options, self.logger_queue)                
   197         # pydevd.settrace(suspend=False)
       
   198         
   211         
   199         source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
   212         source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
   200         
   213         
   201         source_stream_iter_thread.start()
   214         source_stream_iter_thread.start()
   202         
   215         
   203         while not self.stop_event.is_set():
   216         try:
   204             self.logger.debug("SourceProcess : In while after start")
   217             while not self.stop_event.is_set():
   205             self.stop_event.wait(DEFAULT_TIMEOUT)
   218                 self.logger.debug("SourceProcess : In while after start")
   206             if self.stop_event.is_set() and self.stream:
   219                 self.stop_event.wait(DEFAULT_TIMEOUT)
   207                 self.stream.close()
   220         except KeyboardInterrupt:
   208             elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
   221             self.stop_event.set()
   209                 self.stop_event.set()
   222             pass
   210 
   223 
       
   224         if self.stop_event.is_set() and self.stream:
       
   225             self.stream.close()
       
   226         elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
       
   227             self.stop_event.set()
       
   228     
   211         self.logger.info("SourceProcess : join")
   229         self.logger.info("SourceProcess : join")
   212         source_stream_iter_thread.join(30)
   230         source_stream_iter_thread.join(30)
   213 
   231 
   214 
   232 
   215 def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger):
   233 def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger):
   216     try:
   234     try:
   217         if not tweet.strip():
   235         if not tweet.strip():
   218             return
   236             return
   219         tweet_obj = anyjson.deserialize(tweet)
   237         tweet_obj = anyjson.deserialize(tweet)
   220         if 'text' not in tweet_obj:
   238         processor_klass = get_processor(tweet_obj)
       
   239         if not processor_klass:
   221             tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
   240             tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
   222             session.add(tweet_log)
   241             session.add(tweet_log)
   223             return
   242             return
   224         screen_name = ""
   243         processor = processor_klass(json_dict=tweet_obj,
   225         if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
   244                                     json_txt=tweet,
   226             screen_name = tweet_obj['user']['screen_name']
   245                                     source_id=source_id,
   227         logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
   246                                     session=session,
   228         logger.debug(u"Process_tweet :" + repr(tweet))
   247                                     consumer_token=consumer_token,
   229         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user)
   248                                     access_token=access_token,
       
   249                                     token_filename=token_filename,
       
   250                                     user_query_twitter=twitter_query_user,
       
   251                                     logger=logger)
       
   252         logger.info(processor.log_info())                        
       
   253         logger.debug(u"Process_tweet :" + repr(tweet))                
   230         processor.process()
   254         processor.process()
       
   255         
   231     except ValueError as e:
   256     except ValueError as e:
   232         message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
   257         message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
   233         output = StringIO.StringIO()
   258         output = StringIO.StringIO()
   234         try:
   259         try:
   235             traceback.print_exc(file=output)
   260             traceback.print_exc(file=output)
   272                     source_id, tweet_txt = self.queue.get(True, 3)
   297                     source_id, tweet_txt = self.queue.get(True, 3)
   273                     self.logger.debug("Processing source id " + repr(source_id))
   298                     self.logger.debug("Processing source id " + repr(source_id))
   274                 except Exception as e:
   299                 except Exception as e:
   275                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
   300                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
   276                     continue
   301                     continue
   277                 process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger)
   302                 process_tweet(tweet_txt, source_id, session, self.consumer_token, self.access_token, self.twitter_query_user, self.options.token_filename, self.logger)
   278                 session.commit()
   303                 session.commit()
       
   304         except KeyboardInterrupt:
       
   305             self.stop_event.set()
   279         finally:
   306         finally:
   280             session.rollback()
   307             session.rollback()
   281             session.close()
   308             session.close()
   282 
   309 
   283 
   310 
   285     engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
   312     engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
   286     Session = scoped_session(Session)
   313     Session = scoped_session(Session)
   287     return Session, engine, metadata
   314     return Session, engine, metadata
   288 
   315 
   289             
   316             
   290 def process_leftovers(session, access_token, twitter_query_user, logger):
   317 def process_leftovers(session, consumer_token, access_token, twitter_query_user, token_filename, ask_process_leftovers, logger):
   291     
   318     
   292     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
   319     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
   293     
   320     sources_count = sources.count()
       
   321     
       
   322     if sources_count > 10 and ask_process_leftovers:
       
   323         resp = raw_input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count)
       
   324         if resp and resp.strip().lower() == "n":
       
   325             return
       
   326     logger.info("Process leftovers, %d tweets to process" % (sources_count))
   294     for src in sources:
   327     for src in sources:
   295         tweet_txt = src.original_json
   328         tweet_txt = src.original_json
   296         process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger)
   329         process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger)
   297         session.commit()
   330         session.commit()
   298 
       
   299         
   331         
   300     
   332     
   301     # get tweet source that do not match any message
   333     # get tweet source that do not match any message
   302     # select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
   334     # select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
   303 def process_log(logger_queues, stop_event):
   335 def process_log(logger_queues, stop_event):
   313         time.sleep(0.1)
   345         time.sleep(0.1)
   314 
   346 
   315         
   347         
   316 def get_options():
   348 def get_options():
   317 
   349 
   318     usage = "usage: %prog [options]"
   350     usage = "usage: %(prog)s [options]"
   319 
   351 
   320     parser = OptionParser(usage=usage)
   352     parser = argparse.ArgumentParser(usage=usage)
   321 
   353 
   322     parser.add_option("-f", "--file", dest="conn_str",
   354     parser.add_argument("-f", "--file", dest="conn_str",
   323                       help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
   355                         help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
   324     parser.add_option("-u", "--user", dest="username",
   356     parser.add_argument("-T", "--track", dest="track",
   325                       help="Twitter user", metavar="USER", default=None)
   357                         help="Twitter track", metavar="TRACK")
   326     parser.add_option("-w", "--password", dest="password",
   358     parser.add_argument("-k", "--key", dest="consumer_key",
   327                       help="Twitter password", metavar="PASSWORD", default=None)
   359                         help="Twitter consumer key", metavar="CONSUMER_KEY", required=True)
   328     parser.add_option("-T", "--track", dest="track",
   360     parser.add_argument("-s", "--secret", dest="consumer_secret",
   329                       help="Twitter track", metavar="TRACK")
   361                         help="Twitter consumer secret", metavar="CONSUMER_SECRET", required=True)
   330     parser.add_option("-n", "--new", dest="new", action="store_true",
   362     parser.add_argument("-n", "--new", dest="new", action="store_true",
   331                       help="new database", default=False)
   363                         help="new database", default=False)
   332     parser.add_option("-D", "--daemon", dest="daemon", action="store_true",
   364     parser.add_argument("-D", "--daemon", dest="daemon", action="store_true",
   333                       help="launch daemon", default=False)
   365                         help="launch daemon", default=False)
   334     parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
   366     parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
   335                       help="Token file name")
   367                         help="Token file name")
   336     parser.add_option("-d", "--duration", dest="duration",
   368     parser.add_argument("-d", "--duration", dest="duration",
   337                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
   369                         help="Duration of recording in seconds", metavar="DURATION", default= -1, type=int)
   338     parser.add_option("-N", "--nb-process", dest="process_nb",
   370     parser.add_argument("-N", "--nb-process", dest="process_nb",
   339                       help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
   371                         help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type=int)
   340     parser.add_option("--url", dest="url",
   372     parser.add_argument("--url", dest="url",
   341                       help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url)
   373                         help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url)
   342     parser.add_option("--query-user", dest="twitter_query_user", action="store_true",
   374     parser.add_argument("--query-user", dest="twitter_query_user", action="store_true",
   343                       help="Query twitter for users", default=False, metavar="QUERY_USER")
   375                         help="Query twitter for users", default=False)
   344     parser.add_option("--catchup", dest="catchup",
   376     parser.add_argument("--timeout", dest="timeout",
   345                       help="catchup count for tweets", default=None, metavar="CATCHUP", type='int')
   377                         help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type=int)
   346     parser.add_option("--timeout", dest="timeout",
   378     parser.add_argument("--ask-process-leftovers", dest="ask_process_leftovers", action="store_false",
   347                       help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int')
   379                         help="ask process leftover", default=True)
   348     
       
   349 
       
   350 
   380 
   351 
   381 
   352     utils.set_logging_options(parser)
   382     utils.set_logging_options(parser)
   353 
   383 
   354     return parser.parse_args()
   384     return parser.parse_args()
   355 
   385 
   356 
   386 
   357 def do_run(options, session_maker):
   387 def do_run(options, session_maker):
   358 
   388 
   359     stop_args = {}
   389     stop_args = {}
   360 
   390     
   361     access_token = None
   391     consumer_token = (options.consumer_key, options.consumer_secret)
   362     if not options.username or not options.password:
   392     access_token = utils.get_oauth_token(consumer_key=consumer_token[0], consumer_secret=consumer_token[1], token_file_path=options.token_filename)
   363         access_token = utils.get_oauth_token(options.token_filename)
   393     
   364     
   394     
   365     session = session_maker()
   395     session = session_maker()
   366     try:
   396     try:
   367         process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
   397         process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
   368         session.commit()
   398         session.commit()
   369     finally:
   399     finally:
   370         session.rollback()
   400         session.rollback()
   371         session.close()
   401         session.close()
   372     
   402     
   376 
   406 
   377     queue = mQueue()
   407     queue = mQueue()
   378     stop_event = Event()
   408     stop_event = Event()
   379     
   409     
   380     # workaround for bug on using urllib2 and multiprocessing
   410     # workaround for bug on using urllib2 and multiprocessing
   381     req = urllib2.Request('http://localhost')
   411     httpd = HTTPServer(('127.0.0.1',0), Requesthandler)
       
   412     thread.start_new_thread(httpd.handle_request, ())
       
   413     
       
   414     req = urllib2.Request('http://localhost:%d' % httpd.server_port)
   382     conn = None
   415     conn = None
   383     try:
   416     try:
   384         conn = urllib2.urlopen(req)
   417         conn = urllib2.urlopen(req)
   385     except:
   418     except:
   386         utils.get_logger().debug("could not open localhost")
   419         utils.get_logger().debug("could not open localhost")
   390             conn.close()
   423             conn.close()
   391     
   424     
   392     process_engines = []
   425     process_engines = []
   393     logger_queues = []
   426     logger_queues = []
   394     
   427     
   395     SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
   428     SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
   396     process_engines.append(engine_process)
   429     process_engines.append(engine_process)
   397     lqueue = mQueue(50)
   430     lqueue = mQueue(50)
   398     logger_queues.append(lqueue)
   431     logger_queues.append(lqueue)
   399     pid = os.getpid()
   432     pid = os.getpid()
   400     sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
   433     sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
   401     
   434     
   402     tweet_processes = []
   435     tweet_processes = []
   403     
   436     
   404     for i in range(options.process_nb - 1):
   437     for i in range(options.process_nb - 1):
   405         SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
   438         SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
   406         process_engines.append(engine_process)
   439         process_engines.append(engine_process)
   407         lqueue = mQueue(50)
   440         lqueue = mQueue(50)
   408         logger_queues.append(lqueue)
   441         logger_queues.append(lqueue)
   409         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
   442         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
   410         tweet_processes.append(cprocess)
   443         tweet_processes.append(cprocess)
   460         except:
   493         except:
   461             utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
   494             utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
   462             cprocess.terminate()
   495             cprocess.terminate()
   463 
   496 
   464     
   497     
   465     utils.get_logger().debug("Close queues")
   498     utils.get_logger().debug("Close queues")        
   466     try:
       
   467         queue.close()
       
   468         for lqueue in logger_queues:
       
   469             lqueue.close()
       
   470     except exception as e:
       
   471         utils.get_logger().error("error when closing queues %s", repr(e))
       
   472         # do nothing
       
   473         
       
   474     
   499     
   475     if options.process_nb > 1:
   500     if options.process_nb > 1:
   476         utils.get_logger().debug("Processing leftovers")
   501         utils.get_logger().debug("Processing leftovers")
   477         session = session_maker()
   502         session = session_maker()
   478         try:
   503         try:
   479             process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
   504             process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
   480             session.commit()
   505             session.commit()
   481         finally:
   506         finally:
   482             session.rollback()
   507             session.rollback()
   483             session.close()
   508             session.close()
   484 
   509 
   485     for pengine in process_engines:
   510     for pengine in process_engines:
   486         pengine.dispose()
   511         pengine.dispose()
       
   512     
       
   513     try:
       
   514         queue.close()
       
   515         for lqueue in logger_queues:
       
   516             lqueue.close()
       
   517     except Exception as e:
       
   518         utils.get_logger().error("error when closing queues %s", repr(e))
       
   519         # do nothing
   487 
   520 
   488     return stop_args
   521     return stop_args
   489 
   522 
   490 
   523 
   491 def main(options, args):
   524 def main(options):
   492     
   525     
   493     global conn_str
   526     global conn_str
   494     
   527     
   495     conn_str = options.conn_str.strip()
   528     conn_str = options.conn_str.strip()
   496     if not re.match("^\w+://.+", conn_str):
   529     if not re.match("^\w+://.+", conn_str):
   511                 shutil.move(filepath, new_path)
   544                 shutil.move(filepath, new_path)
   512 
   545 
   513     Session, engine, metadata = get_sessionmaker(conn_str)
   546     Session, engine, metadata = get_sessionmaker(conn_str)
   514     
   547     
   515     if options.new:
   548     if options.new:
   516         check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
   549         check_metadata = sqlalchemy.schema.MetaData(bind=engine)
       
   550         check_metadata.reflect()
   517         if len(check_metadata.sorted_tables) > 0:
   551         if len(check_metadata.sorted_tables) > 0:
   518             message = "Database %s not empty exiting" % conn_str
   552             message = "Database %s not empty exiting" % conn_str
   519             utils.get_logger().error(message)
   553             utils.get_logger().error(message)
   520             sys.exit(message)
   554             sys.exit(message)
   521     
   555     
   526     finally:
   560     finally:
   527         session.close()
   561         session.close()
   528     
   562     
   529     stop_args = {}
   563     stop_args = {}
   530     try:
   564     try:
   531         add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
   565         add_process_event(event_type="start", args={'options':options.__dict__, 'args': [], 'command_line': sys.argv}, session_maker=Session)
   532         stop_args = do_run(options, Session)
   566         stop_args = do_run(options, Session)
   533     except Exception as e:
   567     except Exception as e:
   534         utils.get_logger().exception("Error in main thread")        
   568         utils.get_logger().exception("Error in main thread")        
   535         outfile = StringIO.StringIO()
   569         outfile = StringIO.StringIO()
   536         try:
   570         try:
   538             stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
   572             stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
   539         finally:
   573         finally:
   540             outfile.close()
   574             outfile.close()
   541         raise
   575         raise
   542     finally:    
   576     finally:    
   543         add_process_event(type="shutdown", args=stop_args, session_maker=Session)
   577         add_process_event(event_type="shutdown", args=stop_args, session_maker=Session)
   544 
   578 
   545     utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
   579     utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
   546 
   580 
   547 
   581 
   548 
   582 
   549 if __name__ == '__main__':
   583 if __name__ == '__main__':
   550 
   584 
   551     (options, args) = get_options()
   585     options = get_options()
   552     
   586     
   553     loggers = set_logging(options)
   587     loggers = set_logging(options)
   554     
   588     
   555     utils.get_logger().debug("OPTIONS : " + repr(options))
   589     utils.get_logger().debug("OPTIONS : " + repr(options))
   556     
   590     
   557     if options.daemon:
   591     if options.daemon:
       
   592         options.ask_process_leftovers = False
   558         import daemon
   593         import daemon
   559         import lockfile
       
   560         
   594         
   561         hdlr_preserve = []
   595         hdlr_preserve = []
   562         for logger in loggers:
   596         for logger in loggers:
   563             hdlr_preserve.extend([h.stream for h in logger.handlers])
   597             hdlr_preserve.extend([h.stream for h in logger.handlers])
   564             
   598             
   565         context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) 
   599         context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) 
   566         with context:
   600         with context:
   567             main(options, args)
   601             main(options)
   568     else:
   602     else:
   569         main(options, args)
   603         main(options)
   570 
   604