script/stream/recorder_stream.py
changeset 1497 14a9bed2e3cd
parent 1074 94d3d8f5eb9d
equal deleted inserted replaced
1496:184372ec27e2 1497:14a9bed2e3cd
     1 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
       
     2 from iri_tweet import models, utils
       
     3 from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
       
     4 from iri_tweet.processor import get_processor
       
     5 from multiprocessing import Queue as mQueue, Process, Event
       
     6 from sqlalchemy.exc import OperationalError
       
     7 from sqlalchemy.orm import scoped_session
       
     8 import Queue
       
     9 import StringIO
       
    10 import anyjson
       
    11 import argparse
     1 import argparse
    12 import datetime
     2 import datetime
    13 import inspect
     3 import inspect
    14 import iri_tweet.stream
     4 import json
    15 import logging
     5 import logging
    16 import os
     6 import os
       
     7 import queue
    17 import re
     8 import re
    18 import requests_oauthlib
       
    19 import shutil
     9 import shutil
    20 import signal
    10 import signal
    21 import socket
    11 import socket
    22 import sqlalchemy.schema
       
    23 import sys
    12 import sys
    24 import thread
       
    25 import threading
    13 import threading
    26 import time
    14 import time
    27 import traceback
    15 import traceback
    28 import urllib2
    16 import urllib
    29 socket._fileobject.default_bufsize = 0
    17 from http.server import BaseHTTPRequestHandler, HTTPServer
    30 
    18 from io import StringIO
       
    19 from multiprocessing import Event, Process
       
    20 from multiprocessing import Queue as mQueue
       
    21 
       
    22 import requests_oauthlib
       
    23 import sqlalchemy.schema
       
    24 import twitter
       
    25 from sqlalchemy.exc import OperationalError
       
    26 from sqlalchemy.orm import scoped_session
       
    27 
       
    28 import _thread
       
    29 import iri_tweet.stream
       
    30 from iri_tweet import models, utils
       
    31 from iri_tweet.models import ProcessEvent, TweetLog, TweetSource
       
    32 from iri_tweet.processor import get_processor
    31 
    33 
    32 
    34 
    33 # columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
    35 # columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
    34 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
    36 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
    35 # columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
    37 # columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
    38 
    40 
    39 DEFAULT_TIMEOUT = 3
    41 DEFAULT_TIMEOUT = 3
    40 
    42 
    41 class Requesthandler(BaseHTTPRequestHandler):
    43 class Requesthandler(BaseHTTPRequestHandler):
    42 
    44 
    43     def __init__(self, request, client_address, server):
       
    44         BaseHTTPRequestHandler.__init__(self, request, client_address, server)
       
    45         
       
    46     def do_GET(self):
    45     def do_GET(self):
    47         self.send_response(200)
    46         self.send_response(200)
    48         self.end_headers()
    47         self.end_headers()
    49     
    48 
    50     def log_message(self, format, *args):        # @ReservedAssignment
    49     def log_message(self, format, *args):        # @ReservedAssignment
    51         pass
    50         pass
    52 
    51 
    53 
    52 
    54 def set_logging(options):
    53 def set_logging(options):
    55     loggers = []
    54     loggers = []
    56     
    55 
    57     loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
    56     loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
    58     loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing')))
    57     loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing')))
    59     if options.debug >= 2:
    58     if options.debug >= 2:
    60         loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine')))
    59         loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine')))
    61     # utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
    60     # utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
    66 def set_logging_process(options, queue):
    65 def set_logging_process(options, queue):
    67     qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
    66     qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
    68     qlogger.propagate = 0
    67     qlogger.propagate = 0
    69     return qlogger
    68     return qlogger
    70 
    69 
    71 def get_auth(options, access_token):
    70 def get_auth(consumer_key, consumer_secret, token_key, token_secret):
    72     consumer_key = options.consumer_key
    71     return requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=token_key, resource_owner_secret=token_secret, signature_type='auth_header')
    73     consumer_secret = options.consumer_secret
       
    74     auth = requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=access_token[0], resource_owner_secret=access_token[1], signature_type='auth_header')
       
    75     return auth
       
    76 
    72 
    77 
    73 
    78 def add_process_event(event_type, args, session_maker):
    74 def add_process_event(event_type, args, session_maker):
    79     session = session_maker()
    75     session = session_maker()
    80     try:
    76     try:
    81         evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=event_type)
    77         evt = ProcessEvent(args=None if args is None else json.dumps(args), type=event_type)
    82         session.add(evt)
    78         session.add(evt)
    83         session.commit()
    79         session.commit()
    84     finally:
    80     finally:
    85         session.close()
    81         session.close()
    86 
    82 
    87 
    83 
    88 class BaseProcess(Process):
    84 class BaseProcess(Process):
    89 
    85 
    90     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
    86     def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid):
    91         self.parent_pid = parent_pid
    87         self.parent_pid = parent_pid
    92         self.session_maker = session_maker
    88         self.session_maker = session_maker
    93         self.queue = queue
    89         self.queue = queue
    94         self.options = options
    90         self.options = options
    95         self.logger_queue = logger_queue
    91         self.logger_queue = logger_queue
    96         self.stop_event = stop_event
    92         self.stop_event = stop_event
    97         self.consumer_token = (options.consumer_key, options.consumer_secret)
    93         self.twitter_auth = twitter_auth
    98         self.access_token = access_token
       
    99 
    94 
   100         super(BaseProcess, self).__init__()
    95         super(BaseProcess, self).__init__()
   101 
    96 
   102     #
    97     #
   103     # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
    98     # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
   110             # *beeep* oh no! The phone's disconnected!
   105             # *beeep* oh no! The phone's disconnected!
   111             return False
   106             return False
   112         else:
   107         else:
   113             # *ring* Hi mom!
   108             # *ring* Hi mom!
   114             return True
   109             return True
   115     
   110 
   116 
   111 
   117     def __get_process_event_args(self):
   112     def __get_process_event_args(self):
   118         return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
   113         return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__}
   119 
   114 
   120     def run(self):
   115     def run(self):
   121         try:
   116         try:
   122             add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
   117             add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
   123             self.do_run()
   118             self.do_run()
   124         finally:
   119         finally:
   125             add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
   120             add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
   126         
   121 
   127     def do_run(self):
   122     def do_run(self):
   128         raise NotImplementedError()
   123         raise NotImplementedError()
   129 
   124 
   130 
   125 
   131 
   126 
   132 class SourceProcess(BaseProcess):
   127 class SourceProcess(BaseProcess):
   133     
   128 
   134     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
   129     def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid):
   135         self.track = options.track
   130         self.track = options.track
   136         self.token_filename = options.token_filename
       
   137         self.timeout = options.timeout
   131         self.timeout = options.timeout
   138         self.stream = None
   132         self.stream = None
   139         super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
   133         super(SourceProcess, self).__init__(session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid)
   140                     
   134 
   141     def __source_stream_iter(self):
   135     def __source_stream_iter(self):
   142                 
   136 
   143         self.logger.debug("SourceProcess : run ")
   137         self.logger.debug("SourceProcess : run ")
   144         
   138 
   145         self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.access_token))
   139         self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.twitter_auth))
   146         self.auth = get_auth(self.options, self.access_token) 
   140         self.auth = get_auth(self.twitter_auth.consumer_key, self.twitter_auth.consumer_secret, self.twitter_auth.token, self.twitter_auth.token_secret)
   147         self.logger.debug("SourceProcess : auth set ")
   141         self.logger.debug("SourceProcess : auth set ")
   148         
       
   149         track_list = self.track  # or raw_input('Keywords to track (comma seperated): ').strip()
   142         track_list = self.track  # or raw_input('Keywords to track (comma seperated): ').strip()
   150         self.logger.debug("SourceProcess : track list " + track_list)
   143         self.logger.debug("SourceProcess : track list " + track_list)
   151         
   144 
   152         track_list = [k.strip() for k in track_list.split(',')]
   145         track_list = [k.strip() for k in track_list.split(',')]
   153 
   146 
   154         self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth)))                        
   147         self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth)))
   155         self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, chunk_size=512, logger=self.logger)
   148         self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, logger=self.logger)
   156         self.logger.debug("SourceProcess : after connecting to stream")
   149         self.logger.debug("SourceProcess : after connecting to stream")
   157         self.stream.muststop = lambda: self.stop_event.is_set()        
   150         self.stream.muststop = lambda: self.stop_event.is_set()
   158         
   151 
   159         stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger)
   152         stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger)
   160         
   153 
   161         session = self.session_maker()
   154         session = self.session_maker()
   162         
   155 
   163         #import pydevd
   156         #import pydevd
   164         #pydevd.settrace(suspend=False)
   157         #pydevd.settrace(suspend=False)
   165 
   158 
   166         
   159 
   167         try:
   160         try:
   168             for tweet in stream_wrapper:
   161             for tweet in stream_wrapper:
   169                 if not self.parent_is_alive():
   162                 if not self.parent_is_alive():
   170                     self.stop_event.set()
   163                     self.stop_event.set()
   171                     sys.exit()
   164                     sys.exit()
   182                     except OperationalError as e:
   175                     except OperationalError as e:
   183                         session.rollback()
   176                         session.rollback()
   184                         self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
   177                         self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
   185                         if add_retries == 10:
   178                         if add_retries == 10:
   186                             raise
   179                             raise
   187                      
   180 
   188                 source_id = source.id
   181                 source_id = source.id
   189                 self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
   182                 self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
   190                 self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
   183                 self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
   191                 session.commit()
   184                 session.commit()
   192                 self.queue.put((source_id, tweet), False)
   185                 self.queue.put((source_id, tweet), False)
   202             if not self.stop_event.is_set():
   195             if not self.stop_event.is_set():
   203                 self.stop_event.set()
   196                 self.stop_event.set()
   204 
   197 
   205 
   198 
   206     def do_run(self):
   199     def do_run(self):
   207         
   200 
   208         self.logger = set_logging_process(self.options, self.logger_queue)                
   201         self.logger = set_logging_process(self.options, self.logger_queue)
   209         
   202 
   210         source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
   203         source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
   211         
   204 
   212         source_stream_iter_thread.start()
   205         source_stream_iter_thread.start()
   213         
   206 
   214         try:
   207         try:
   215             while not self.stop_event.is_set():
   208             while not self.stop_event.is_set():
   216                 self.logger.debug("SourceProcess : In while after start")
   209                 self.logger.debug("SourceProcess : In while after start")
   217                 self.stop_event.wait(DEFAULT_TIMEOUT)
   210                 self.stop_event.wait(DEFAULT_TIMEOUT)
   218         except KeyboardInterrupt:
   211         except KeyboardInterrupt:
   228         self.logger_queue.cancel_join_thread()
   221         self.logger_queue.cancel_join_thread()
   229         self.logger.info("SourceProcess : join")
   222         self.logger.info("SourceProcess : join")
   230         source_stream_iter_thread.join(30)
   223         source_stream_iter_thread.join(30)
   231 
   224 
   232 
   225 
   233 def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger):
   226 def process_tweet(tweet, source_id, session, twitter_auth, twitter_query_user, logger):
   234     try:
   227     try:
   235         if not tweet.strip():
   228         if not tweet.strip():
   236             return
   229             return
   237         tweet_obj = anyjson.deserialize(tweet)
   230         tweet_obj = json.loads(tweet)
   238         processor_klass = get_processor(tweet_obj)
   231         processor_klass = get_processor(tweet_obj)
   239         if not processor_klass:
   232         if not processor_klass:
   240             tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
   233             tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
   241             session.add(tweet_log)
   234             session.add(tweet_log)
   242             return
   235             return
   243         processor = processor_klass(json_dict=tweet_obj,
   236         processor = processor_klass(json_dict=tweet_obj,
   244                                     json_txt=tweet,
   237                                     json_txt=tweet,
   245                                     source_id=source_id,
   238                                     source_id=source_id,
   246                                     session=session,
   239                                     session=session,
   247                                     consumer_token=consumer_token,
   240                                     twitter_auth=twitter_auth,
   248                                     access_token=access_token,
       
   249                                     token_filename=token_filename,
       
   250                                     user_query_twitter=twitter_query_user,
   241                                     user_query_twitter=twitter_query_user,
   251                                     logger=logger)
   242                                     logger=logger)
   252         logger.info(processor.log_info())                        
   243         logger.info(processor.log_info())
   253         logger.debug(u"Process_tweet :" + repr(tweet))                
   244         logger.debug(u"Process_tweet :" + repr(tweet))
   254         processor.process()
   245         processor.process()
   255         
   246 
   256     except ValueError as e:
   247     except ValueError as e:
   257         message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
   248         message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
   258         output = StringIO.StringIO()
   249         output = StringIO()
   259         try:
   250         try:
   260             traceback.print_exc(file=output)
   251             traceback.print_exc(file=output)
   261             error_stack = output.getvalue()
   252             error_stack = output.getvalue()
   262         finally:
   253         finally:
   263             output.close()
   254             output.close()
   264         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
   255         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
   265         session.add(tweet_log)
   256         session.add(tweet_log)
   266         session.commit()        
   257         session.commit()
   267     except Exception as e:
   258     except Exception as e:
   268         message = u"Error %s processing tweet %s" % (repr(e), tweet)
   259         message = u"Error %s processing tweet %s" % (repr(e), tweet)
   269         logger.exception(message)
   260         logger.exception(message)
   270         output = StringIO.StringIO()
   261         output = StringIO()
   271         try:
   262         try:
   272             traceback.print_exc(file=output)
   263             traceback.print_exc(file=output)
   273             error_stack = output.getvalue()
   264             error_stack = output.getvalue()
   274         finally:
   265         finally:
   275             output.close()
   266             output.close()
   276         session.rollback()
   267         session.rollback()
   277         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
   268         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
   278         session.add(tweet_log)
   269         session.add(tweet_log)
   279         session.commit()
   270         session.commit()
   280 
   271 
   281     
   272 
   282         
   273 
   283 class TweetProcess(BaseProcess):
   274 class TweetProcess(BaseProcess):
   284     
   275 
   285     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
   276     def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid):
   286         super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
   277         super(TweetProcess, self).__init__(session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid)
   287         self.twitter_query_user = options.twitter_query_user
   278         self.twitter_query_user = options.twitter_query_user
   288 
   279 
   289 
   280 
   290     def do_run(self):
   281     def do_run(self):
   291         
   282 
   292         self.logger = set_logging_process(self.options, self.logger_queue)
   283         self.logger = set_logging_process(self.options, self.logger_queue)
   293         session = self.session_maker()
   284         session = self.session_maker()
   294         try:
   285         try:
   295             while not self.stop_event.is_set() and self.parent_is_alive():
   286             while not self.stop_event.is_set() and self.parent_is_alive():
   296                 try:
   287                 try:
   297                     source_id, tweet_txt = self.queue.get(True, 3)
   288                     source_id, tweet_txt = self.queue.get(True, 3)
   298                     self.logger.debug("Processing source id " + repr(source_id))
   289                     self.logger.debug("Processing source id " + repr(source_id))
   299                 except Exception as e:
   290                 except Exception as e:
   300                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
   291                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
   301                     continue
   292                     continue
   302                 process_tweet(tweet_txt, source_id, session, self.consumer_token, self.access_token, self.twitter_query_user, self.options.token_filename, self.logger)
   293                 process_tweet(tweet_txt, source_id, session, self.twitter_auth, self.twitter_query_user, self.logger)
   303                 session.commit()
   294                 session.commit()
   304         except KeyboardInterrupt:
   295         except KeyboardInterrupt:
   305             self.stop_event.set()
   296             self.stop_event.set()
   306         finally:
   297         finally:
   307             session.rollback()
   298             session.rollback()
   311 def get_sessionmaker(conn_str):
   302 def get_sessionmaker(conn_str):
   312     engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
   303     engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
   313     Session = scoped_session(Session)
   304     Session = scoped_session(Session)
   314     return Session, engine, metadata
   305     return Session, engine, metadata
   315 
   306 
   316             
   307 
   317 def process_leftovers(session, consumer_token, access_token, twitter_query_user, token_filename, ask_process_leftovers, logger):
   308 def process_leftovers(session, twitter_auth, twitter_query_user, ask_process_leftovers, logger):
   318     
   309 
   319     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
   310     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
   320     sources_count = sources.count()
   311     sources_count = sources.count()
   321     
   312 
   322     if sources_count > 10 and ask_process_leftovers:
   313     if sources_count > 10 and ask_process_leftovers:
   323         resp = raw_input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count)
   314         resp = input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count)
   324         if resp and resp.strip().lower() == "n":
   315         if resp and resp.strip().lower() == "n":
   325             return
   316             return
   326     logger.info("Process leftovers, %d tweets to process" % (sources_count))
   317     logger.info("Process leftovers, %d tweets to process" % (sources_count))
   327     for src in sources:
   318     for src in sources:
   328         tweet_txt = src.original_json
   319         tweet_txt = src.original_json
   329         process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger)
   320         process_tweet(tweet_txt, src.id, session, twitter_auth, twitter_query_user, logger)
   330         session.commit()
   321         session.commit()
   331         
   322 
   332     
   323 
   333 def process_log(logger_queues, stop_event):
   324 def process_log(logger_queues, stop_event):
   334     while not stop_event.is_set():
   325     while not stop_event.is_set():
   335         for lqueue in logger_queues:
   326         for lqueue in logger_queues:
   336             try:
   327             try:
   337                 record = lqueue.get_nowait()
   328                 record = lqueue.get_nowait()
   338                 logging.getLogger(record.name).handle(record)
   329                 logging.getLogger(record.name).handle(record)
   339             except Queue.Empty:
   330             except queue.Empty:
   340                 continue
   331                 continue
   341             except IOError:
   332             except IOError:
   342                 continue
   333                 continue
   343         time.sleep(0.1)
   334         time.sleep(0.1)
   344 
   335 
   345         
   336 
   346 def get_options():
   337 def get_options():
   347 
   338 
   348     usage = "usage: %(prog)s [options]"
   339     usage = "usage: %(prog)s [options]"
   349 
   340 
   350     parser = argparse.ArgumentParser(usage=usage)
   341     parser = argparse.ArgumentParser(usage=usage)
   383 
   374 
   384 
   375 
   385 def do_run(options, session_maker):
   376 def do_run(options, session_maker):
   386 
   377 
   387     stop_args = {}
   378     stop_args = {}
   388     
   379 
   389     consumer_token = (options.consumer_key, options.consumer_secret)
   380 
   390     access_token = utils.get_oauth_token(consumer_key=consumer_token[0], consumer_secret=consumer_token[1], token_file_path=options.token_filename)
   381     access_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename)
   391     
   382     twitter_auth = twitter.OAuth(access_token_key, access_token_secret, options.consumer_key, options.consumer_secret)
   392     
   383 
   393     session = session_maker()
   384     session = session_maker()
   394     try:
   385     try:
   395         process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
   386         process_leftovers(session, twitter_auth, options.twitter_query_user, options.ask_process_leftovers, utils.get_logger())
   396         session.commit()
   387         session.commit()
   397     finally:
   388     finally:
   398         session.rollback()
   389         session.rollback()
   399         session.close()
   390         session.close()
   400     
   391 
   401     if options.process_nb <= 0:
   392     if options.process_nb <= 0:
   402         utils.get_logger().debug("Leftovers processed. Exiting.")
   393         utils.get_logger().debug("Leftovers processed. Exiting.")
   403         return None
   394         return None
   404 
   395 
   405     queue = mQueue()
   396     queue = mQueue()
   406     stop_event = Event()
   397     stop_event = Event()
   407     
   398 
   408     # workaround for bug on using urllib2 and multiprocessing
   399     # workaround for bug on using urllib2 and multiprocessing
   409     httpd = HTTPServer(('127.0.0.1',0), Requesthandler)
   400     httpd = HTTPServer(('127.0.0.1',0), Requesthandler)
   410     thread.start_new_thread(httpd.handle_request, ())
   401     _thread.start_new_thread(httpd.handle_request, ())
   411     
   402 
   412     req = urllib2.Request('http://localhost:%d' % httpd.server_port)
   403     req = urllib.request.Request('http://localhost:%d' % httpd.server_port)
   413     conn = None
   404     conn = None
   414     try:
   405     try:
   415         conn = urllib2.urlopen(req)
   406         conn = urllib.request.urlopen(req)
   416     except:
   407     except:
   417         utils.get_logger().debug("could not open localhost")
   408         utils.get_logger().debug("could not open localhost")
   418         # donothing
   409         # donothing
   419     finally:
   410     finally:
   420         if conn is not None:
   411         if conn is not None:
   421             conn.close()
   412             conn.close()
   422     
   413 
   423     process_engines = []
   414     process_engines = []
   424     logger_queues = []
   415     logger_queues = []
   425     
   416 
   426     SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
   417     SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
   427     process_engines.append(engine_process)
   418     process_engines.append(engine_process)
   428     lqueue = mQueue(50)
   419     lqueue = mQueue(50)
   429     logger_queues.append(lqueue)
   420     logger_queues.append(lqueue)
   430     pid = os.getpid()
   421     pid = os.getpid()
   431     sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
   422     sprocess = SourceProcess(SessionProcess, queue, options, twitter_auth, stop_event, lqueue, pid)
   432     
   423 
   433     tweet_processes = []
   424     tweet_processes = []
   434     
   425 
   435     for i in range(options.process_nb - 1):
   426     for i in range(options.process_nb - 1):
   436         SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
   427         SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
   437         process_engines.append(engine_process)
   428         process_engines.append(engine_process)
   438         lqueue = mQueue(50)
   429         lqueue = mQueue(50)
   439         logger_queues.append(lqueue)
   430         logger_queues.append(lqueue)
   440         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
   431         cprocess = TweetProcess(SessionProcess, queue, options, twitter_auth, stop_event, lqueue, pid)
   441         tweet_processes.append(cprocess)
   432         tweet_processes.append(cprocess)
   442 
   433 
   443     log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
   434     log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
   444     log_thread.daemon = True
   435     log_thread.daemon = True
   445 
   436 
   450         cprocess.start()
   441         cprocess.start()
   451 
   442 
   452     add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
   443     add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
   453 
   444 
   454     if options.duration >= 0:
   445     if options.duration >= 0:
   455         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
   446         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
   456 
   447 
   457     def interupt_handler(signum, frame):
   448     def interupt_handler(signum, frame):
   458         utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
   449         utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
   459         stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
   450         stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
   460         stop_event.set()
   451         stop_event.set()
   461         
   452 
   462     signal.signal(signal.SIGINT , interupt_handler)
   453     signal.signal(signal.SIGINT , interupt_handler)
   463     signal.signal(signal.SIGHUP , interupt_handler)
   454     signal.signal(signal.SIGHUP , interupt_handler)
   464     signal.signal(signal.SIGALRM, interupt_handler)
   455     signal.signal(signal.SIGALRM, interupt_handler)
   465     signal.signal(signal.SIGTERM, interupt_handler)
   456     signal.signal(signal.SIGTERM, interupt_handler)
   466     
   457 
   467 
   458 
   468     while not stop_event.is_set():
   459     while not stop_event.is_set():
   469         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
   460         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
   470             stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
   461             stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
   471             stop_event.set()
   462             stop_event.set()
   482         sprocess.join(10)
   473         sprocess.join(10)
   483     except:
   474     except:
   484         utils.get_logger().debug("Pb joining Source Process - terminating")
   475         utils.get_logger().debug("Pb joining Source Process - terminating")
   485     finally:
   476     finally:
   486         sprocess.terminate()
   477         sprocess.terminate()
   487         
   478 
   488     for i, cprocess in enumerate(tweet_processes):
   479     for i, cprocess in enumerate(tweet_processes):
   489         utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
   480         utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
   490         try:
   481         try:
   491             cprocess.join(3)
   482             cprocess.join(3)
   492         except:
   483         except:
   493             utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
   484             utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
   494             cprocess.terminate()
   485             cprocess.terminate()
   495 
   486 
   496     
   487 
   497     utils.get_logger().debug("Close queues")
   488     utils.get_logger().debug("Close queues")
   498     try:
   489     try:
   499         queue.close()
   490         queue.close()
   500         for lqueue in logger_queues:
   491         for lqueue in logger_queues:
   501             lqueue.close()
   492             lqueue.close()
   502     except Exception as e:
   493     except Exception as e:
   503         utils.get_logger().error("error when closing queues %s", repr(e))
   494         utils.get_logger().error("error when closing queues %s", repr(e))
   504         # do nothing
   495         # do nothing
   505         
   496 
   506     
   497 
   507     if options.process_nb > 1:
   498     if options.process_nb > 1:
   508         utils.get_logger().debug("Processing leftovers")
   499         utils.get_logger().debug("Processing leftovers")
   509         session = session_maker()
   500         session = session_maker()
   510         try:
   501         try:
   511             process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
   502             process_leftovers(session, twitter_auth, options.twitter_query_user, options.ask_process_leftovers, utils.get_logger())
   512             session.commit()
   503             session.commit()
   513         finally:
   504         finally:
   514             session.rollback()
   505             session.rollback()
   515             session.close()
   506             session.close()
   516 
   507 
   517     for pengine in process_engines:
   508     for pengine in process_engines:
   518         pengine.dispose()
   509         pengine.dispose()
   519     
   510 
   520     return stop_args
   511     return stop_args
   521 
   512 
   522 
   513 
   523 def main(options):
   514 def main(options):
   524     
   515 
   525     global conn_str
   516     global conn_str
   526     
   517 
   527     conn_str = options.conn_str.strip()
   518     conn_str = options.conn_str.strip()
   528     if not re.match("^\w+://.+", conn_str):
   519     if not re.match(r"^\w+://.+", conn_str):
   529         conn_str = 'sqlite:///' + options.conn_str
   520         conn_str = 'sqlite:///' + options.conn_str
   530         
   521 
   531     if conn_str.startswith("sqlite") and options.new:
   522     if conn_str.startswith("sqlite") and options.new:
   532         filepath = conn_str[conn_str.find(":///") + 4:]
   523         filepath = conn_str[conn_str.find(":///") + 4:]
   533         if os.path.exists(filepath):
   524         if os.path.exists(filepath):
   534             i = 1
   525             i = 1
   535             basename, extension = os.path.splitext(filepath)
   526             basename, extension = os.path.splitext(filepath)
   541                 raise Exception("Unable to find new filename for " + filepath)
   532                 raise Exception("Unable to find new filename for " + filepath)
   542             else:
   533             else:
   543                 shutil.move(filepath, new_path)
   534                 shutil.move(filepath, new_path)
   544 
   535 
   545     Session, engine, metadata = get_sessionmaker(conn_str)
   536     Session, engine, metadata = get_sessionmaker(conn_str)
   546     
   537 
   547     if options.new:
   538     if options.new:
   548         check_metadata = sqlalchemy.schema.MetaData(bind=engine)
   539         check_metadata = sqlalchemy.schema.MetaData(bind=engine)
   549         check_metadata.reflect()
   540         check_metadata.reflect()
   550         if len(check_metadata.sorted_tables) > 0:
   541         if len(check_metadata.sorted_tables) > 0:
   551             message = "Database %s not empty exiting" % conn_str
   542             message = "Database %s not empty exiting" % conn_str
   552             utils.get_logger().error(message)
   543             utils.get_logger().error(message)
   553             sys.exit(message)
   544             sys.exit(message)
   554     
   545 
   555     metadata.create_all(engine)
   546     metadata.create_all(engine)
   556     session = Session()
   547     session = Session()
   557     try:
   548     try:
   558         models.add_model_version(session)
   549         models.add_model_version(session)
   559     finally:
   550     finally:
   560         session.close()
   551         session.close()
   561     
   552 
   562     stop_args = {}
   553     stop_args = {}
   563     try:
   554     try:
   564         add_process_event(event_type="start", args={'options':options.__dict__, 'args': [], 'command_line': sys.argv}, session_maker=Session)
   555         add_process_event(event_type="start", args={'options':options.__dict__, 'args': [], 'command_line': sys.argv}, session_maker=Session)
   565         stop_args = do_run(options, Session)
   556         stop_args = do_run(options, Session)
   566     except Exception as e:
   557     except Exception as e:
   567         utils.get_logger().exception("Error in main thread")        
   558         utils.get_logger().exception("Error in main thread")
   568         outfile = StringIO.StringIO()
   559         outfile = StringIO()
   569         try:
   560         try:
   570             traceback.print_exc(file=outfile)
   561             traceback.print_exc(file=outfile)
   571             stop_args = {'error': repr(e), 'message': repr(e), 'stacktrace':outfile.getvalue()}
   562             stop_args = {'error': repr(e), 'message': repr(e), 'stacktrace':outfile.getvalue()}
   572         finally:
   563         finally:
   573             outfile.close()
   564             outfile.close()
   574         raise
   565         raise
   575     finally:    
   566     finally:
   576         add_process_event(event_type="shutdown", args=stop_args, session_maker=Session)
   567         add_process_event(event_type="shutdown", args=stop_args, session_maker=Session)
   577 
   568 
   578     utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
   569     utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
   579 
   570 
   580 
   571 
   581 
   572 
   582 if __name__ == '__main__':
   573 if __name__ == '__main__':
   583 
   574 
   584     options = get_options()
   575     options = get_options()
   585     
   576 
   586     loggers = set_logging(options)
   577     loggers = set_logging(options)
   587     
   578 
   588     utils.get_logger().debug("OPTIONS : " + repr(options))
   579     utils.get_logger().debug("OPTIONS : " + repr(options))
   589     
   580 
   590     if options.daemon:
   581     if options.daemon:
   591         options.ask_process_leftovers = False
   582         options.ask_process_leftovers = False
   592         import daemon
   583         import daemon
   593         
   584 
   594         hdlr_preserve = []
   585         hdlr_preserve = []
   595         for logger in loggers:
   586         for logger in loggers:
   596             hdlr_preserve.extend([h.stream for h in logger.handlers])
   587             hdlr_preserve.extend([h.stream for h in logger.handlers])
   597             
   588 
   598         context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) 
   589         context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve)
   599         with context:
   590         with context:
   600             main(options)
   591             main(options)
   601     else:
   592     else:
   602         main(options)
   593         main(options)
   603