--- a/script/stream/recorder_stream.py Wed Jan 02 17:49:19 2019 +0100
+++ b/script/stream/recorder_stream.py Thu Jan 10 18:36:36 2019 +0100
@@ -1,33 +1,35 @@
-from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
-from iri_tweet import models, utils
-from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
-from iri_tweet.processor import get_processor
-from multiprocessing import Queue as mQueue, Process, Event
-from sqlalchemy.exc import OperationalError
-from sqlalchemy.orm import scoped_session
-import Queue
-import StringIO
-import anyjson
import argparse
import datetime
import inspect
-import iri_tweet.stream
+import json
import logging
import os
+import queue
import re
-import requests_oauthlib
import shutil
import signal
import socket
-import sqlalchemy.schema
import sys
-import thread
import threading
import time
import traceback
-import urllib2
-socket._fileobject.default_bufsize = 0
+import urllib
+from http.server import BaseHTTPRequestHandler, HTTPServer
+from io import StringIO
+from multiprocessing import Event, Process
+from multiprocessing import Queue as mQueue
+import requests_oauthlib
+import sqlalchemy.schema
+import twitter
+from sqlalchemy.exc import OperationalError
+from sqlalchemy.orm import scoped_session
+
+import _thread
+import iri_tweet.stream
+from iri_tweet import models, utils
+from iri_tweet.models import ProcessEvent, TweetLog, TweetSource
+from iri_tweet.processor import get_processor
# columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
@@ -40,20 +42,17 @@
class Requesthandler(BaseHTTPRequestHandler):
- def __init__(self, request, client_address, server):
- BaseHTTPRequestHandler.__init__(self, request, client_address, server)
-
def do_GET(self):
self.send_response(200)
self.end_headers()
-
+
def log_message(self, format, *args): # @ReservedAssignment
pass
def set_logging(options):
loggers = []
-
+
loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing')))
if options.debug >= 2:
@@ -68,17 +67,14 @@
qlogger.propagate = 0
return qlogger
-def get_auth(options, access_token):
- consumer_key = options.consumer_key
- consumer_secret = options.consumer_secret
- auth = requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=access_token[0], resource_owner_secret=access_token[1], signature_type='auth_header')
- return auth
+def get_auth(consumer_key, consumer_secret, token_key, token_secret):
+ return requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=token_key, resource_owner_secret=token_secret, signature_type='auth_header')
def add_process_event(event_type, args, session_maker):
session = session_maker()
try:
- evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=event_type)
+ evt = ProcessEvent(args=None if args is None else json.dumps(args), type=event_type)
session.add(evt)
session.commit()
finally:
@@ -87,15 +83,14 @@
class BaseProcess(Process):
- def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
+ def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid):
self.parent_pid = parent_pid
self.session_maker = session_maker
self.queue = queue
self.options = options
self.logger_queue = logger_queue
self.stop_event = stop_event
- self.consumer_token = (options.consumer_key, options.consumer_secret)
- self.access_token = access_token
+ self.twitter_auth = twitter_auth
super(BaseProcess, self).__init__()
@@ -112,10 +107,10 @@
else:
# *ring* Hi mom!
return True
-
+
def __get_process_event_args(self):
- return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
+ return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__}
def run(self):
try:
@@ -123,47 +118,45 @@
self.do_run()
finally:
add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
-
+
def do_run(self):
raise NotImplementedError()
class SourceProcess(BaseProcess):
-
- def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
+
+ def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid):
self.track = options.track
- self.token_filename = options.token_filename
self.timeout = options.timeout
self.stream = None
- super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
-
+ super(SourceProcess, self).__init__(session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid)
+
def __source_stream_iter(self):
-
+
self.logger.debug("SourceProcess : run ")
-
- self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.access_token))
- self.auth = get_auth(self.options, self.access_token)
+
+ self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.twitter_auth))
+ self.auth = get_auth(self.twitter_auth.consumer_key, self.twitter_auth.consumer_secret, self.twitter_auth.token, self.twitter_auth.token_secret)
self.logger.debug("SourceProcess : auth set ")
-
track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
self.logger.debug("SourceProcess : track list " + track_list)
-
+
track_list = [k.strip() for k in track_list.split(',')]
- self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth)))
- self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, chunk_size=512, logger=self.logger)
+ self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth)))
+ self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, logger=self.logger)
self.logger.debug("SourceProcess : after connecting to stream")
- self.stream.muststop = lambda: self.stop_event.is_set()
-
+ self.stream.muststop = lambda: self.stop_event.is_set()
+
stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger)
-
+
session = self.session_maker()
-
+
#import pydevd
#pydevd.settrace(suspend=False)
-
+
try:
for tweet in stream_wrapper:
if not self.parent_is_alive():
@@ -184,7 +177,7 @@
self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
if add_retries == 10:
raise
-
+
source_id = source.id
self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
@@ -204,13 +197,13 @@
def do_run(self):
-
- self.logger = set_logging_process(self.options, self.logger_queue)
-
+
+ self.logger = set_logging_process(self.options, self.logger_queue)
+
source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
-
+
source_stream_iter_thread.start()
-
+
try:
while not self.stop_event.is_set():
self.logger.debug("SourceProcess : In while after start")
@@ -230,11 +223,11 @@
source_stream_iter_thread.join(30)
-def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger):
+def process_tweet(tweet, source_id, session, twitter_auth, twitter_query_user, logger):
try:
if not tweet.strip():
return
- tweet_obj = anyjson.deserialize(tweet)
+ tweet_obj = json.loads(tweet)
processor_klass = get_processor(tweet_obj)
if not processor_klass:
tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
@@ -244,18 +237,16 @@
json_txt=tweet,
source_id=source_id,
session=session,
- consumer_token=consumer_token,
- access_token=access_token,
- token_filename=token_filename,
+ twitter_auth=twitter_auth,
user_query_twitter=twitter_query_user,
logger=logger)
- logger.info(processor.log_info())
- logger.debug(u"Process_tweet :" + repr(tweet))
+ logger.info(processor.log_info())
+ logger.debug(u"Process_tweet :" + repr(tweet))
processor.process()
-
+
except ValueError as e:
message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
- output = StringIO.StringIO()
+ output = StringIO()
try:
traceback.print_exc(file=output)
error_stack = output.getvalue()
@@ -263,11 +254,11 @@
output.close()
tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
session.add(tweet_log)
- session.commit()
+ session.commit()
except Exception as e:
message = u"Error %s processing tweet %s" % (repr(e), tweet)
logger.exception(message)
- output = StringIO.StringIO()
+ output = StringIO()
try:
traceback.print_exc(file=output)
error_stack = output.getvalue()
@@ -278,17 +269,17 @@
session.add(tweet_log)
session.commit()
-
-
+
+
class TweetProcess(BaseProcess):
-
- def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
- super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
+
+ def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid):
+ super(TweetProcess, self).__init__(session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid)
self.twitter_query_user = options.twitter_query_user
def do_run(self):
-
+
self.logger = set_logging_process(self.options, self.logger_queue)
session = self.session_maker()
try:
@@ -299,7 +290,7 @@
except Exception as e:
self.logger.debug('Process tweet exception in loop : ' + repr(e))
continue
- process_tweet(tweet_txt, source_id, session, self.consumer_token, self.access_token, self.twitter_query_user, self.options.token_filename, self.logger)
+ process_tweet(tweet_txt, source_id, session, self.twitter_auth, self.twitter_query_user, self.logger)
session.commit()
except KeyboardInterrupt:
self.stop_event.set()
@@ -313,36 +304,36 @@
Session = scoped_session(Session)
return Session, engine, metadata
-
-def process_leftovers(session, consumer_token, access_token, twitter_query_user, token_filename, ask_process_leftovers, logger):
-
+
+def process_leftovers(session, twitter_auth, twitter_query_user, ask_process_leftovers, logger):
+
sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
sources_count = sources.count()
-
+
if sources_count > 10 and ask_process_leftovers:
- resp = raw_input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count)
+ resp = input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count)
if resp and resp.strip().lower() == "n":
return
logger.info("Process leftovers, %d tweets to process" % (sources_count))
for src in sources:
tweet_txt = src.original_json
- process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger)
+ process_tweet(tweet_txt, src.id, session, twitter_auth, twitter_query_user, logger)
session.commit()
-
-
+
+
def process_log(logger_queues, stop_event):
while not stop_event.is_set():
for lqueue in logger_queues:
try:
record = lqueue.get_nowait()
logging.getLogger(record.name).handle(record)
- except Queue.Empty:
+ except queue.Empty:
continue
except IOError:
continue
time.sleep(0.1)
-
+
def get_options():
usage = "usage: %(prog)s [options]"
@@ -385,59 +376,59 @@
def do_run(options, session_maker):
stop_args = {}
-
- consumer_token = (options.consumer_key, options.consumer_secret)
- access_token = utils.get_oauth_token(consumer_key=consumer_token[0], consumer_secret=consumer_token[1], token_file_path=options.token_filename)
-
-
+
+
+ access_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename)
+ twitter_auth = twitter.OAuth(access_token_key, access_token_secret, options.consumer_key, options.consumer_secret)
+
session = session_maker()
try:
- process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
+ process_leftovers(session, twitter_auth, options.twitter_query_user, options.ask_process_leftovers, utils.get_logger())
session.commit()
finally:
session.rollback()
session.close()
-
+
if options.process_nb <= 0:
utils.get_logger().debug("Leftovers processed. Exiting.")
return None
queue = mQueue()
stop_event = Event()
-
+
# workaround for bug on using urllib2 and multiprocessing
httpd = HTTPServer(('127.0.0.1',0), Requesthandler)
- thread.start_new_thread(httpd.handle_request, ())
-
- req = urllib2.Request('http://localhost:%d' % httpd.server_port)
+ _thread.start_new_thread(httpd.handle_request, ())
+
+ req = urllib.request.Request('http://localhost:%d' % httpd.server_port)
conn = None
try:
- conn = urllib2.urlopen(req)
+ conn = urllib.request.urlopen(req)
except:
utils.get_logger().debug("could not open localhost")
# donothing
finally:
if conn is not None:
conn.close()
-
+
process_engines = []
logger_queues = []
-
+
SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
process_engines.append(engine_process)
lqueue = mQueue(50)
logger_queues.append(lqueue)
pid = os.getpid()
- sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
-
+ sprocess = SourceProcess(SessionProcess, queue, options, twitter_auth, stop_event, lqueue, pid)
+
tweet_processes = []
-
+
for i in range(options.process_nb - 1):
SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
process_engines.append(engine_process)
lqueue = mQueue(50)
logger_queues.append(lqueue)
- cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
+ cprocess = TweetProcess(SessionProcess, queue, options, twitter_auth, stop_event, lqueue, pid)
tweet_processes.append(cprocess)
log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
@@ -452,18 +443,18 @@
add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
if options.duration >= 0:
- end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
+ end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
def interupt_handler(signum, frame):
utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9)))
stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
stop_event.set()
-
+
signal.signal(signal.SIGINT , interupt_handler)
signal.signal(signal.SIGHUP , interupt_handler)
signal.signal(signal.SIGALRM, interupt_handler)
signal.signal(signal.SIGTERM, interupt_handler)
-
+
while not stop_event.is_set():
if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts:
@@ -484,7 +475,7 @@
utils.get_logger().debug("Pb joining Source Process - terminating")
finally:
sprocess.terminate()
-
+
for i, cprocess in enumerate(tweet_processes):
utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
try:
@@ -493,7 +484,7 @@
utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
cprocess.terminate()
-
+
utils.get_logger().debug("Close queues")
try:
queue.close()
@@ -502,13 +493,13 @@
except Exception as e:
utils.get_logger().error("error when closing queues %s", repr(e))
# do nothing
-
-
+
+
if options.process_nb > 1:
utils.get_logger().debug("Processing leftovers")
session = session_maker()
try:
- process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
+ process_leftovers(session, twitter_auth, options.twitter_query_user, options.ask_process_leftovers, utils.get_logger())
session.commit()
finally:
session.rollback()
@@ -516,18 +507,18 @@
for pengine in process_engines:
pengine.dispose()
-
+
return stop_args
def main(options):
-
+
global conn_str
-
+
conn_str = options.conn_str.strip()
- if not re.match("^\w+://.+", conn_str):
+ if not re.match(r"^\w+://.+", conn_str):
conn_str = 'sqlite:///' + options.conn_str
-
+
if conn_str.startswith("sqlite") and options.new:
filepath = conn_str[conn_str.find(":///") + 4:]
if os.path.exists(filepath):
@@ -543,7 +534,7 @@
shutil.move(filepath, new_path)
Session, engine, metadata = get_sessionmaker(conn_str)
-
+
if options.new:
check_metadata = sqlalchemy.schema.MetaData(bind=engine)
check_metadata.reflect()
@@ -551,28 +542,28 @@
message = "Database %s not empty exiting" % conn_str
utils.get_logger().error(message)
sys.exit(message)
-
+
metadata.create_all(engine)
session = Session()
try:
models.add_model_version(session)
finally:
session.close()
-
+
stop_args = {}
try:
add_process_event(event_type="start", args={'options':options.__dict__, 'args': [], 'command_line': sys.argv}, session_maker=Session)
stop_args = do_run(options, Session)
except Exception as e:
- utils.get_logger().exception("Error in main thread")
- outfile = StringIO.StringIO()
+ utils.get_logger().exception("Error in main thread")
+ outfile = StringIO()
try:
traceback.print_exc(file=outfile)
stop_args = {'error': repr(e), 'message': repr(e), 'stacktrace':outfile.getvalue()}
finally:
outfile.close()
raise
- finally:
+ finally:
add_process_event(event_type="shutdown", args=stop_args, session_maker=Session)
utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
@@ -582,22 +573,21 @@
if __name__ == '__main__':
options = get_options()
-
+
loggers = set_logging(options)
-
+
utils.get_logger().debug("OPTIONS : " + repr(options))
-
+
if options.daemon:
options.ask_process_leftovers = False
import daemon
-
+
hdlr_preserve = []
for logger in loggers:
hdlr_preserve.extend([h.stream for h in logger.handlers])
-
- context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve)
+
+ context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve)
with context:
main(options)
else:
main(options)
-