script/stream/recorder_stream.py
changeset 1497 14a9bed2e3cd
parent 1074 94d3d8f5eb9d
--- a/script/stream/recorder_stream.py	Wed Jan 02 17:49:19 2019 +0100
+++ b/script/stream/recorder_stream.py	Thu Jan 10 18:36:36 2019 +0100
@@ -1,33 +1,35 @@
-from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
-from iri_tweet import models, utils
-from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
-from iri_tweet.processor import get_processor
-from multiprocessing import Queue as mQueue, Process, Event
-from sqlalchemy.exc import OperationalError
-from sqlalchemy.orm import scoped_session
-import Queue
-import StringIO
-import anyjson
 import argparse
 import datetime
 import inspect
-import iri_tweet.stream
+import json
 import logging
 import os
+import queue
 import re
-import requests_oauthlib
 import shutil
 import signal
 import socket
-import sqlalchemy.schema
 import sys
-import thread
 import threading
 import time
 import traceback
-import urllib2
-socket._fileobject.default_bufsize = 0
+import urllib
+from http.server import BaseHTTPRequestHandler, HTTPServer
+from io import StringIO
+from multiprocessing import Event, Process
+from multiprocessing import Queue as mQueue
 
+import requests_oauthlib
+import sqlalchemy.schema
+import twitter
+from sqlalchemy.exc import OperationalError
+from sqlalchemy.orm import scoped_session
+
+import _thread
+import iri_tweet.stream
+from iri_tweet import models, utils
+from iri_tweet.models import ProcessEvent, TweetLog, TweetSource
+from iri_tweet.processor import get_processor
 
 
 # columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
@@ -40,20 +42,17 @@
 
 class Requesthandler(BaseHTTPRequestHandler):
 
-    def __init__(self, request, client_address, server):
-        BaseHTTPRequestHandler.__init__(self, request, client_address, server)
-        
     def do_GET(self):
         self.send_response(200)
         self.end_headers()
-    
+
     def log_message(self, format, *args):        # @ReservedAssignment
         pass
 
 
 def set_logging(options):
     loggers = []
-    
+
     loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
     loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing')))
     if options.debug >= 2:
@@ -68,17 +67,14 @@
     qlogger.propagate = 0
     return qlogger
 
-def get_auth(options, access_token):
-    consumer_key = options.consumer_key
-    consumer_secret = options.consumer_secret
-    auth = requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=access_token[0], resource_owner_secret=access_token[1], signature_type='auth_header')
-    return auth
+def get_auth(consumer_key, consumer_secret, token_key, token_secret):
+    return requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=token_key, resource_owner_secret=token_secret, signature_type='auth_header')
 
 
 def add_process_event(event_type, args, session_maker):
     session = session_maker()
     try:
-        evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=event_type)
+        evt = ProcessEvent(args=None if args is None else json.dumps(args), type=event_type)
         session.add(evt)
         session.commit()
     finally:
@@ -87,15 +83,14 @@
 
 class BaseProcess(Process):
 
-    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
+    def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid):
         self.parent_pid = parent_pid
         self.session_maker = session_maker
         self.queue = queue
         self.options = options
         self.logger_queue = logger_queue
         self.stop_event = stop_event
-        self.consumer_token = (options.consumer_key, options.consumer_secret)
-        self.access_token = access_token
+        self.twitter_auth = twitter_auth
 
         super(BaseProcess, self).__init__()
 
@@ -112,10 +107,10 @@
         else:
             # *ring* Hi mom!
             return True
-    
+
 
     def __get_process_event_args(self):
-        return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
+        return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__}
 
     def run(self):
         try:
@@ -123,47 +118,45 @@
             self.do_run()
         finally:
             add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
-        
+
     def do_run(self):
         raise NotImplementedError()
 
 
 
 class SourceProcess(BaseProcess):
-    
-    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
+
+    def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid):
         self.track = options.track
-        self.token_filename = options.token_filename
         self.timeout = options.timeout
         self.stream = None
-        super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
-                    
+        super(SourceProcess, self).__init__(session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid)
+
     def __source_stream_iter(self):
-                
+
         self.logger.debug("SourceProcess : run ")
-        
-        self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.access_token))
-        self.auth = get_auth(self.options, self.access_token) 
+
+        self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.twitter_auth))
+        self.auth = get_auth(self.twitter_auth.consumer_key, self.twitter_auth.consumer_secret, self.twitter_auth.token, self.twitter_auth.token_secret)
         self.logger.debug("SourceProcess : auth set ")
-        
         track_list = self.track  # or raw_input('Keywords to track (comma seperated): ').strip()
         self.logger.debug("SourceProcess : track list " + track_list)
-        
+
         track_list = [k.strip() for k in track_list.split(',')]
 
-        self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth)))                        
-        self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, chunk_size=512, logger=self.logger)
+        self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth)))
+        self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, logger=self.logger)
         self.logger.debug("SourceProcess : after connecting to stream")
-        self.stream.muststop = lambda: self.stop_event.is_set()        
-        
+        self.stream.muststop = lambda: self.stop_event.is_set()
+
         stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger)
-        
+
         session = self.session_maker()
-        
+
         #import pydevd
         #pydevd.settrace(suspend=False)
 
-        
+
         try:
             for tweet in stream_wrapper:
                 if not self.parent_is_alive():
@@ -184,7 +177,7 @@
                         self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
                         if add_retries == 10:
                             raise
-                     
+
                 source_id = source.id
                 self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
                 self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
@@ -204,13 +197,13 @@
 
 
     def do_run(self):
-        
-        self.logger = set_logging_process(self.options, self.logger_queue)                
-        
+
+        self.logger = set_logging_process(self.options, self.logger_queue)
+
         source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
-        
+
         source_stream_iter_thread.start()
-        
+
         try:
             while not self.stop_event.is_set():
                 self.logger.debug("SourceProcess : In while after start")
@@ -230,11 +223,11 @@
         source_stream_iter_thread.join(30)
 
 
-def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger):
+def process_tweet(tweet, source_id, session, twitter_auth, twitter_query_user, logger):
     try:
         if not tweet.strip():
             return
-        tweet_obj = anyjson.deserialize(tweet)
+        tweet_obj = json.loads(tweet)
         processor_klass = get_processor(tweet_obj)
         if not processor_klass:
             tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
@@ -244,18 +237,16 @@
                                     json_txt=tweet,
                                     source_id=source_id,
                                     session=session,
-                                    consumer_token=consumer_token,
-                                    access_token=access_token,
-                                    token_filename=token_filename,
+                                    twitter_auth=twitter_auth,
                                     user_query_twitter=twitter_query_user,
                                     logger=logger)
-        logger.info(processor.log_info())                        
-        logger.debug(u"Process_tweet :" + repr(tweet))                
+        logger.info(processor.log_info())
+        logger.debug(u"Process_tweet :" + repr(tweet))
         processor.process()
-        
+
     except ValueError as e:
         message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
-        output = StringIO.StringIO()
+        output = StringIO()
         try:
             traceback.print_exc(file=output)
             error_stack = output.getvalue()
@@ -263,11 +254,11 @@
             output.close()
         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
         session.add(tweet_log)
-        session.commit()        
+        session.commit()
     except Exception as e:
         message = u"Error %s processing tweet %s" % (repr(e), tweet)
         logger.exception(message)
-        output = StringIO.StringIO()
+        output = StringIO()
         try:
             traceback.print_exc(file=output)
             error_stack = output.getvalue()
@@ -278,17 +269,17 @@
         session.add(tweet_log)
         session.commit()
 
-    
-        
+
+
 class TweetProcess(BaseProcess):
-    
-    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
-        super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
+
+    def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid):
+        super(TweetProcess, self).__init__(session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid)
         self.twitter_query_user = options.twitter_query_user
 
 
     def do_run(self):
-        
+
         self.logger = set_logging_process(self.options, self.logger_queue)
         session = self.session_maker()
         try:
@@ -299,7 +290,7 @@
                 except Exception as e:
                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
                     continue
-                process_tweet(tweet_txt, source_id, session, self.consumer_token, self.access_token, self.twitter_query_user, self.options.token_filename, self.logger)
+                process_tweet(tweet_txt, source_id, session, self.twitter_auth, self.twitter_query_user, self.logger)
                 session.commit()
         except KeyboardInterrupt:
             self.stop_event.set()
@@ -313,36 +304,36 @@
     Session = scoped_session(Session)
     return Session, engine, metadata
 
-            
-def process_leftovers(session, consumer_token, access_token, twitter_query_user, token_filename, ask_process_leftovers, logger):
-    
+
+def process_leftovers(session, twitter_auth, twitter_query_user, ask_process_leftovers, logger):
+
     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
     sources_count = sources.count()
-    
+
     if sources_count > 10 and ask_process_leftovers:
-        resp = raw_input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count)
+        resp = input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count)
         if resp and resp.strip().lower() == "n":
             return
     logger.info("Process leftovers, %d tweets to process" % (sources_count))
     for src in sources:
         tweet_txt = src.original_json
-        process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger)
+        process_tweet(tweet_txt, src.id, session, twitter_auth, twitter_query_user, logger)
         session.commit()
-        
-    
+
+
 def process_log(logger_queues, stop_event):
     while not stop_event.is_set():
         for lqueue in logger_queues:
             try:
                 record = lqueue.get_nowait()
                 logging.getLogger(record.name).handle(record)
-            except Queue.Empty:
+            except queue.Empty:
                 continue
             except IOError:
                 continue
         time.sleep(0.1)
 
-        
+
 def get_options():
 
     usage = "usage: %(prog)s [options]"
@@ -385,59 +376,59 @@
 def do_run(options, session_maker):
 
     stop_args = {}
-    
-    consumer_token = (options.consumer_key, options.consumer_secret)
-    access_token = utils.get_oauth_token(consumer_key=consumer_token[0], consumer_secret=consumer_token[1], token_file_path=options.token_filename)
-    
-    
+
+
+    access_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename)
+    twitter_auth = twitter.OAuth(access_token_key, access_token_secret, options.consumer_key, options.consumer_secret)
+
     session = session_maker()
     try:
-        process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
+        process_leftovers(session, twitter_auth, options.twitter_query_user, options.ask_process_leftovers, utils.get_logger())
         session.commit()
     finally:
         session.rollback()
         session.close()
-    
+
     if options.process_nb <= 0:
         utils.get_logger().debug("Leftovers processed. Exiting.")
         return None
 
     queue = mQueue()
     stop_event = Event()
-    
+
     # workaround for bug on using urllib2 and multiprocessing
     httpd = HTTPServer(('127.0.0.1',0), Requesthandler)
-    thread.start_new_thread(httpd.handle_request, ())
-    
-    req = urllib2.Request('http://localhost:%d' % httpd.server_port)
+    _thread.start_new_thread(httpd.handle_request, ())
+
+    req = urllib.request.Request('http://localhost:%d' % httpd.server_port)
     conn = None
     try:
-        conn = urllib2.urlopen(req)
+        conn = urllib.request.urlopen(req)
     except:
         utils.get_logger().debug("could not open localhost")
         # donothing
     finally:
         if conn is not None:
             conn.close()
-    
+
     process_engines = []
     logger_queues = []
-    
+
     SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
     process_engines.append(engine_process)
     lqueue = mQueue(50)
     logger_queues.append(lqueue)
     pid = os.getpid()
-    sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
-    
+    sprocess = SourceProcess(SessionProcess, queue, options, twitter_auth, stop_event, lqueue, pid)
+
     tweet_processes = []
-    
+
     for i in range(options.process_nb - 1):
         SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
         process_engines.append(engine_process)
         lqueue = mQueue(50)
         logger_queues.append(lqueue)
-        cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
+        cprocess = TweetProcess(SessionProcess, queue, options, twitter_auth, stop_event, lqueue, pid)
         tweet_processes.append(cprocess)
 
     log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
@@ -452,18 +443,18 @@
     add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
 
     if options.duration >= 0:
-        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
+        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
 
     def interupt_handler(signum, frame):
         utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
         stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
         stop_event.set()
-        
+
     signal.signal(signal.SIGINT , interupt_handler)
     signal.signal(signal.SIGHUP , interupt_handler)
     signal.signal(signal.SIGALRM, interupt_handler)
     signal.signal(signal.SIGTERM, interupt_handler)
-    
+
 
     while not stop_event.is_set():
         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
@@ -484,7 +475,7 @@
         utils.get_logger().debug("Pb joining Source Process - terminating")
     finally:
         sprocess.terminate()
-        
+
     for i, cprocess in enumerate(tweet_processes):
         utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
         try:
@@ -493,7 +484,7 @@
             utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
             cprocess.terminate()
 
-    
+
     utils.get_logger().debug("Close queues")
     try:
         queue.close()
@@ -502,13 +493,13 @@
     except Exception as e:
         utils.get_logger().error("error when closing queues %s", repr(e))
         # do nothing
-        
-    
+
+
     if options.process_nb > 1:
         utils.get_logger().debug("Processing leftovers")
         session = session_maker()
         try:
-            process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
+            process_leftovers(session, twitter_auth, options.twitter_query_user, options.ask_process_leftovers, utils.get_logger())
             session.commit()
         finally:
             session.rollback()
@@ -516,18 +507,18 @@
 
     for pengine in process_engines:
         pengine.dispose()
-    
+
     return stop_args
 
 
 def main(options):
-    
+
     global conn_str
-    
+
     conn_str = options.conn_str.strip()
-    if not re.match("^\w+://.+", conn_str):
+    if not re.match(r"^\w+://.+", conn_str):
         conn_str = 'sqlite:///' + options.conn_str
-        
+
     if conn_str.startswith("sqlite") and options.new:
         filepath = conn_str[conn_str.find(":///") + 4:]
         if os.path.exists(filepath):
@@ -543,7 +534,7 @@
                 shutil.move(filepath, new_path)
 
     Session, engine, metadata = get_sessionmaker(conn_str)
-    
+
     if options.new:
         check_metadata = sqlalchemy.schema.MetaData(bind=engine)
         check_metadata.reflect()
@@ -551,28 +542,28 @@
             message = "Database %s not empty exiting" % conn_str
             utils.get_logger().error(message)
             sys.exit(message)
-    
+
     metadata.create_all(engine)
     session = Session()
     try:
         models.add_model_version(session)
     finally:
         session.close()
-    
+
     stop_args = {}
     try:
         add_process_event(event_type="start", args={'options':options.__dict__, 'args': [], 'command_line': sys.argv}, session_maker=Session)
         stop_args = do_run(options, Session)
     except Exception as e:
-        utils.get_logger().exception("Error in main thread")        
-        outfile = StringIO.StringIO()
+        utils.get_logger().exception("Error in main thread")
+        outfile = StringIO()
         try:
             traceback.print_exc(file=outfile)
             stop_args = {'error': repr(e), 'message': repr(e), 'stacktrace':outfile.getvalue()}
         finally:
             outfile.close()
         raise
-    finally:    
+    finally:
         add_process_event(event_type="shutdown", args=stop_args, session_maker=Session)
 
     utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
@@ -582,22 +573,21 @@
 if __name__ == '__main__':
 
     options = get_options()
-    
+
     loggers = set_logging(options)
-    
+
     utils.get_logger().debug("OPTIONS : " + repr(options))
-    
+
     if options.daemon:
         options.ask_process_leftovers = False
         import daemon
-        
+
         hdlr_preserve = []
         for logger in loggers:
             hdlr_preserve.extend([h.stream for h in logger.handlers])
-            
-        context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) 
+
+        context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve)
         with context:
             main(options)
     else:
         main(options)
-