diff -r 184372ec27e2 -r 14a9bed2e3cd script/stream/recorder_stream.py --- a/script/stream/recorder_stream.py Wed Jan 02 17:49:19 2019 +0100 +++ b/script/stream/recorder_stream.py Thu Jan 10 18:36:36 2019 +0100 @@ -1,33 +1,35 @@ -from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer -from iri_tweet import models, utils -from iri_tweet.models import TweetSource, TweetLog, ProcessEvent -from iri_tweet.processor import get_processor -from multiprocessing import Queue as mQueue, Process, Event -from sqlalchemy.exc import OperationalError -from sqlalchemy.orm import scoped_session -import Queue -import StringIO -import anyjson import argparse import datetime import inspect -import iri_tweet.stream +import json import logging import os +import queue import re -import requests_oauthlib import shutil import signal import socket -import sqlalchemy.schema import sys -import thread import threading import time import traceback -import urllib2 -socket._fileobject.default_bufsize = 0 +import urllib +from http.server import BaseHTTPRequestHandler, HTTPServer +from io import StringIO +from multiprocessing import Event, Process +from multiprocessing import Queue as mQueue +import requests_oauthlib +import sqlalchemy.schema +import twitter +from sqlalchemy.exc import OperationalError +from sqlalchemy.orm import scoped_session + +import _thread +import iri_tweet.stream +from iri_tweet import models, utils +from iri_tweet.models import ProcessEvent, TweetLog, TweetSource +from iri_tweet.processor import get_processor # columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user'] @@ -40,20 +42,17 @@ class Requesthandler(BaseHTTPRequestHandler): - def __init__(self, request, client_address, server): - BaseHTTPRequestHandler.__init__(self, request, client_address, server) - def do_GET(self): self.send_response(200) self.end_headers() - + def log_message(self, format, *args): # @ReservedAssignment pass def set_logging(options): loggers = [] - + loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet'))) loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing'))) if options.debug >= 2: @@ -68,17 +67,14 @@ qlogger.propagate = 0 return qlogger -def get_auth(options, access_token): - consumer_key = options.consumer_key - consumer_secret = options.consumer_secret - auth = requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=access_token[0], resource_owner_secret=access_token[1], signature_type='auth_header') - return auth +def get_auth(consumer_key, consumer_secret, token_key, token_secret): + return requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=token_key, resource_owner_secret=token_secret, signature_type='auth_header') def add_process_event(event_type, args, session_maker): session = session_maker() try: - evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=event_type) + evt = ProcessEvent(args=None if args is None else json.dumps(args), type=event_type) session.add(evt) session.commit() finally: @@ -87,15 +83,14 @@ class BaseProcess(Process): - def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): + def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid): self.parent_pid = parent_pid self.session_maker = session_maker self.queue = queue self.options = options self.logger_queue = logger_queue self.stop_event = stop_event - self.consumer_token = (options.consumer_key, options.consumer_secret) - self.access_token = access_token + self.twitter_auth = twitter_auth super(BaseProcess, self).__init__() @@ -112,10 +107,10 @@ else: # *ring* Hi mom! return True - + def __get_process_event_args(self): - return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token} + return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__} def run(self): try: @@ -123,47 +118,45 @@ self.do_run() finally: add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker) - + def do_run(self): raise NotImplementedError() class SourceProcess(BaseProcess): - - def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): + + def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid): self.track = options.track - self.token_filename = options.token_filename self.timeout = options.timeout self.stream = None - super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) - + super(SourceProcess, self).__init__(session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid) + def __source_stream_iter(self): - + self.logger.debug("SourceProcess : run ") - - self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.access_token)) - self.auth = get_auth(self.options, self.access_token) + + self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.twitter_auth)) + self.auth = get_auth(self.twitter_auth.consumer_key, self.twitter_auth.consumer_secret, self.twitter_auth.token, self.twitter_auth.token_secret) self.logger.debug("SourceProcess : auth set ") - track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() self.logger.debug("SourceProcess : track list " + track_list) - + track_list = [k.strip() for k in track_list.split(',')] - self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth))) - self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, chunk_size=512, logger=self.logger) + self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth))) + self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, logger=self.logger) self.logger.debug("SourceProcess : after connecting to stream") - self.stream.muststop = lambda: self.stop_event.is_set() - + self.stream.muststop = lambda: self.stop_event.is_set() + stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger) - + session = self.session_maker() - + #import pydevd #pydevd.settrace(suspend=False) - + try: for tweet in stream_wrapper: if not self.parent_is_alive(): @@ -184,7 +177,7 @@ self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries)) if add_retries == 10: raise - + source_id = source.id self.logger.debug("SourceProcess : before queue + source id " + repr(source_id)) self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime))) @@ -204,13 +197,13 @@ def do_run(self): - - self.logger = set_logging_process(self.options, self.logger_queue) - + + self.logger = set_logging_process(self.options, self.logger_queue) + source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread") - + source_stream_iter_thread.start() - + try: while not self.stop_event.is_set(): self.logger.debug("SourceProcess : In while after start") @@ -230,11 +223,11 @@ source_stream_iter_thread.join(30) -def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger): +def process_tweet(tweet, source_id, session, twitter_auth, twitter_query_user, logger): try: if not tweet.strip(): return - tweet_obj = anyjson.deserialize(tweet) + tweet_obj = json.loads(tweet) processor_klass = get_processor(tweet_obj) if not processor_klass: tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) @@ -244,18 +237,16 @@ json_txt=tweet, source_id=source_id, session=session, - consumer_token=consumer_token, - access_token=access_token, - token_filename=token_filename, + twitter_auth=twitter_auth, user_query_twitter=twitter_query_user, logger=logger) - logger.info(processor.log_info()) - logger.debug(u"Process_tweet :" + repr(tweet)) + logger.info(processor.log_info()) + logger.debug(u"Process_tweet :" + repr(tweet)) processor.process() - + except ValueError as e: message = u"Value Error %s processing tweet %s" % (repr(e), tweet) - output = StringIO.StringIO() + output = StringIO() try: traceback.print_exc(file=output) error_stack = output.getvalue() @@ -263,11 +254,11 @@ output.close() tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack) session.add(tweet_log) - session.commit() + session.commit() except Exception as e: message = u"Error %s processing tweet %s" % (repr(e), tweet) logger.exception(message) - output = StringIO.StringIO() + output = StringIO() try: traceback.print_exc(file=output) error_stack = output.getvalue() @@ -278,17 +269,17 @@ session.add(tweet_log) session.commit() - - + + class TweetProcess(BaseProcess): - - def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): - super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) + + def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid): + super(TweetProcess, self).__init__(session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid) self.twitter_query_user = options.twitter_query_user def do_run(self): - + self.logger = set_logging_process(self.options, self.logger_queue) session = self.session_maker() try: @@ -299,7 +290,7 @@ except Exception as e: self.logger.debug('Process tweet exception in loop : ' + repr(e)) continue - process_tweet(tweet_txt, source_id, session, self.consumer_token, self.access_token, self.twitter_query_user, self.options.token_filename, self.logger) + process_tweet(tweet_txt, source_id, session, self.twitter_auth, self.twitter_query_user, self.logger) session.commit() except KeyboardInterrupt: self.stop_event.set() @@ -313,36 +304,36 @@ Session = scoped_session(Session) return Session, engine, metadata - -def process_leftovers(session, consumer_token, access_token, twitter_query_user, token_filename, ask_process_leftovers, logger): - + +def process_leftovers(session, twitter_auth, twitter_query_user, ask_process_leftovers, logger): + sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) sources_count = sources.count() - + if sources_count > 10 and ask_process_leftovers: - resp = raw_input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count) + resp = input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count) if resp and resp.strip().lower() == "n": return logger.info("Process leftovers, %d tweets to process" % (sources_count)) for src in sources: tweet_txt = src.original_json - process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger) + process_tweet(tweet_txt, src.id, session, twitter_auth, twitter_query_user, logger) session.commit() - - + + def process_log(logger_queues, stop_event): while not stop_event.is_set(): for lqueue in logger_queues: try: record = lqueue.get_nowait() logging.getLogger(record.name).handle(record) - except Queue.Empty: + except queue.Empty: continue except IOError: continue time.sleep(0.1) - + def get_options(): usage = "usage: %(prog)s [options]" @@ -385,59 +376,59 @@ def do_run(options, session_maker): stop_args = {} - - consumer_token = (options.consumer_key, options.consumer_secret) - access_token = utils.get_oauth_token(consumer_key=consumer_token[0], consumer_secret=consumer_token[1], token_file_path=options.token_filename) - - + + + access_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename) + twitter_auth = twitter.OAuth(access_token_key, access_token_secret, options.consumer_key, options.consumer_secret) + session = session_maker() try: - process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger()) + process_leftovers(session, twitter_auth, options.twitter_query_user, options.ask_process_leftovers, utils.get_logger()) session.commit() finally: session.rollback() session.close() - + if options.process_nb <= 0: utils.get_logger().debug("Leftovers processed. Exiting.") return None queue = mQueue() stop_event = Event() - + # workaround for bug on using urllib2 and multiprocessing httpd = HTTPServer(('127.0.0.1',0), Requesthandler) - thread.start_new_thread(httpd.handle_request, ()) - - req = urllib2.Request('http://localhost:%d' % httpd.server_port) + _thread.start_new_thread(httpd.handle_request, ()) + + req = urllib.request.Request('http://localhost:%d' % httpd.server_port) conn = None try: - conn = urllib2.urlopen(req) + conn = urllib.request.urlopen(req) except: utils.get_logger().debug("could not open localhost") # donothing finally: if conn is not None: conn.close() - + process_engines = [] logger_queues = [] - + SessionProcess, engine_process, _ = get_sessionmaker(conn_str) process_engines.append(engine_process) lqueue = mQueue(50) logger_queues.append(lqueue) pid = os.getpid() - sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) - + sprocess = SourceProcess(SessionProcess, queue, options, twitter_auth, stop_event, lqueue, pid) + tweet_processes = [] - + for i in range(options.process_nb - 1): SessionProcess, engine_process, _ = get_sessionmaker(conn_str) process_engines.append(engine_process) lqueue = mQueue(50) logger_queues.append(lqueue) - cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) + cprocess = TweetProcess(SessionProcess, queue, options, twitter_auth, stop_event, lqueue, pid) tweet_processes.append(cprocess) log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) @@ -452,18 +443,18 @@ add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker) if options.duration >= 0: - end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) + end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) def interupt_handler(signum, frame): utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}) stop_event.set() - + signal.signal(signal.SIGINT , interupt_handler) signal.signal(signal.SIGHUP , interupt_handler) signal.signal(signal.SIGALRM, interupt_handler) signal.signal(signal.SIGTERM, interupt_handler) - + while not stop_event.is_set(): if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: @@ -484,7 +475,7 @@ utils.get_logger().debug("Pb joining Source Process - terminating") finally: sprocess.terminate() - + for i, cprocess in enumerate(tweet_processes): utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) try: @@ -493,7 +484,7 @@ utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) cprocess.terminate() - + utils.get_logger().debug("Close queues") try: queue.close() @@ -502,13 +493,13 @@ except Exception as e: utils.get_logger().error("error when closing queues %s", repr(e)) # do nothing - - + + if options.process_nb > 1: utils.get_logger().debug("Processing leftovers") session = session_maker() try: - process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger()) + process_leftovers(session, twitter_auth, options.twitter_query_user, options.ask_process_leftovers, utils.get_logger()) session.commit() finally: session.rollback() @@ -516,18 +507,18 @@ for pengine in process_engines: pengine.dispose() - + return stop_args def main(options): - + global conn_str - + conn_str = options.conn_str.strip() - if not re.match("^\w+://.+", conn_str): + if not re.match(r"^\w+://.+", conn_str): conn_str = 'sqlite:///' + options.conn_str - + if conn_str.startswith("sqlite") and options.new: filepath = conn_str[conn_str.find(":///") + 4:] if os.path.exists(filepath): @@ -543,7 +534,7 @@ shutil.move(filepath, new_path) Session, engine, metadata = get_sessionmaker(conn_str) - + if options.new: check_metadata = sqlalchemy.schema.MetaData(bind=engine) check_metadata.reflect() @@ -551,28 +542,28 @@ message = "Database %s not empty exiting" % conn_str utils.get_logger().error(message) sys.exit(message) - + metadata.create_all(engine) session = Session() try: models.add_model_version(session) finally: session.close() - + stop_args = {} try: add_process_event(event_type="start", args={'options':options.__dict__, 'args': [], 'command_line': sys.argv}, session_maker=Session) stop_args = do_run(options, Session) except Exception as e: - utils.get_logger().exception("Error in main thread") - outfile = StringIO.StringIO() + utils.get_logger().exception("Error in main thread") + outfile = StringIO() try: traceback.print_exc(file=outfile) stop_args = {'error': repr(e), 'message': repr(e), 'stacktrace':outfile.getvalue()} finally: outfile.close() raise - finally: + finally: add_process_event(event_type="shutdown", args=stop_args, session_maker=Session) utils.get_logger().debug("Done. Exiting. " + repr(stop_args)) @@ -582,22 +573,21 @@ if __name__ == '__main__': options = get_options() - + loggers = set_logging(options) - + utils.get_logger().debug("OPTIONS : " + repr(options)) - + if options.daemon: options.ask_process_leftovers = False import daemon - + hdlr_preserve = [] for logger in loggers: hdlr_preserve.extend([h.stream for h in logger.handlers]) - - context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) + + context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) with context: main(options) else: main(options) -