--- a/script/stream/recorder_tweetstream.py Sun Aug 28 11:48:30 2011 +0200
+++ b/script/stream/recorder_tweetstream.py Sun Aug 28 15:42:28 2011 +0200
@@ -1,6 +1,6 @@
from getpass import getpass
from iri_tweet import models, utils
-from iri_tweet.models import TweetSource, TweetLog
+from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event,
get_logger)
from optparse import OptionParser
@@ -10,6 +10,7 @@
import StringIO
import anyjson
import datetime
+import inspect
import logging
import os
import re
@@ -110,24 +111,68 @@
# Don't listen to auth error, since we can't reasonably reconnect
# when we get one.
+def add_process_event(type, args, session_maker):
+ session = session_maker()
+ try:
+ evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
+ session.add(evt)
+ session.commit()
+ finally:
+ session.close()
+
+
+class BaseProcess(Process):
+
+ def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
+ self.parent_pid = parent_pid
+ self.session_maker = session_maker
+ self.queue = queue
+ self.options = options
+ self.logger_queue = logger_queue
+ self.stop_event = stop_event
+ self.access_token = access_token
+
+ super(BaseProcess, self).__init__()
+
+ #
+ # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
+ #
+ def parent_is_alive(self):
+ try:
+ # try to call Parent
+ os.kill(self.parent_pid, 0)
+ except OSError:
+ # *beeep* oh no! The phone's disconnected!
+ return False
+ else:
+ # *ring* Hi mom!
+ return True
+
+
+ def __get_process_event_args(self):
+ return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
+
+ def run(self):
+ try:
+ add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
+ self.do_run()
+ finally:
+ add_process_event("stop_worker",self.__get_process_event_args(), self.session_maker)
+
+ def do_run(self):
+ raise NotImplementedError()
-class SourceProcess(Process):
+class SourceProcess(BaseProcess):
- def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
- self.session_maker = session_maker
- self.queue = queue
+ def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
self.track = options.track
self.reconnects = options.reconnects
self.token_filename = options.token_filename
- self.stop_event = stop_event
- self.options = options
- self.access_token = access_token
- self.logger_queue = logger_queue
- super(SourceProcess, self).__init__()
-
- def run(self):
+ super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
+
+ def do_run(self):
#import pydevd
#pydevd.settrace(suspend=False)
@@ -148,9 +193,11 @@
try:
for tweet in stream:
- self.logger.debug("tweet " + repr(tweet))
+ if not self.parent_is_alive():
+ sys.exit()
+ self.logger.debug("SourceProcess : tweet " + repr(tweet))
source = TweetSource(original_json=tweet)
- self.logger.debug("source created")
+ self.logger.debug("SourceProcess : source created")
add_retries = 0
while add_retries < 10:
try:
@@ -160,18 +207,18 @@
break
except OperationalError as e:
session.rollback()
- self.logger.debug("Operational Error %s nb %d" % (repr(e), add_retries))
+ self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
if add_retries == 10:
raise e
source_id = source.id
- self.logger.debug("before queue + source id " + repr(source_id))
- self.logger.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+ self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
+ self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
session.commit()
self.queue.put((source_id, tweet), False)
except Exception as e:
- self.logger.error("Error when processing tweet " + repr(e))
+ self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
finally:
session.rollback()
stream.close()
@@ -196,11 +243,13 @@
processor.process()
except Exception as e:
message = u"Error %s processing tweet %s" % (repr(e), tweet)
- logger.error(message)
+ logger.exception(message)
output = StringIO.StringIO()
- traceback.print_exception(Exception, e, None, None, output)
- error_stack = output.getvalue()
- output.close()
+ try:
+ traceback.print_exc(file=output)
+ error_stack = output.getvalue()
+ finally:
+ output.close()
session.rollback()
tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
session.add(tweet_log)
@@ -208,26 +257,20 @@
-class TweetProcess(Process):
+class TweetProcess(BaseProcess):
- def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
- self.session_maker = session_maker
- self.queue = queue
- self.stop_event = stop_event
- self.options = options
- self.access_token = access_token
- self.logger_queue = logger_queue
- super(TweetProcess, self).__init__()
+ def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
+ super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
- def run(self):
+ def do_run(self):
self.logger = set_logging_process(self.options, self.logger_queue)
session = self.session_maker()
try:
- while not self.stop_event.is_set():
+ while not self.stop_event.is_set() and self.parent_is_alive():
try:
- source_id, tweet_txt = queue.get(True, 3)
+ source_id, tweet_txt = self.queue.get(True, 3)
self.logger.debug("Processing source id " + repr(source_id))
except Exception as e:
self.logger.debug('Process tweet exception in loop : ' + repr(e))
@@ -273,7 +316,11 @@
def get_options():
- parser = OptionParser()
+
+ usage = "usage: %prog [options]"
+
+ parser = OptionParser(usage=usage)
+
parser.add_option("-f", "--file", dest="conn_str",
help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
parser.add_option("-u", "--user", dest="username",
@@ -293,58 +340,20 @@
parser.add_option("-N", "--nb-process", dest="process_nb",
help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
-
-
utils.set_logging_options(parser)
return parser.parse_args()
-
-if __name__ == '__main__':
- (options, args) = get_options()
-
- set_logging(options)
-
- if options.debug:
- print "OPTIONS : "
- print repr(options)
-
-
- conn_str = options.conn_str.strip()
- if not re.match("^\w+://.+", conn_str):
- conn_str = 'sqlite:///' + options.conn_str
-
- if conn_str.startswith("sqlite") and options.new:
- filepath = conn_str[conn_str.find(":///")+4:]
- if os.path.exists(filepath):
- i = 1
- basename, extension = os.path.splitext(filepath)
- new_path = '%s.%d%s' % (basename, i, extension)
- while i < 1000000 and os.path.exists(new_path):
- i += 1
- new_path = '%s.%d%s' % (basename, i, extension)
- if i >= 1000000:
- raise Exception("Unable to find new filename for " + filepath)
- else:
- shutil.move(filepath, new_path)
+def do_run(options, session_maker):
- Session, engine, metadata = get_sessionmaker(conn_str)
-
- if options.new:
- check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
- if len(check_metadata.sorted_tables) > 0:
- message = "Database %s not empty exiting" % conn_str
- utils.get_logger().error(message)
- sys.exit(message)
-
- metadata.create_all(engine)
-
+ stop_args = {}
+
access_token = None
if not options.username or not options.password:
access_token = utils.get_oauth_token(options.token_filename)
- session = Session()
+ session = session_maker()
try:
process_leftovers(session, access_token, utils.get_logger())
session.commit()
@@ -354,7 +363,7 @@
if options.process_nb <= 0:
utils.get_logger().debug("Leftovers processed. Exiting.")
- sys.exit()
+ return None
queue = mQueue()
stop_event = Event()
@@ -365,7 +374,7 @@
try:
conn = urllib2.urlopen(req)
except:
- pass
+ utils.get_logger().debug("could not open localhost")
#donothing
finally:
if conn is not None:
@@ -378,7 +387,8 @@
process_engines.append(engine_process)
lqueue = mQueue(1)
logger_queues.append(lqueue)
- sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)
+ pid = os.getpid()
+ sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
tweet_processes = []
@@ -387,22 +397,29 @@
process_engines.append(engine_process)
lqueue = mQueue(1)
logger_queues.append(lqueue)
- cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)
+ cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
tweet_processes.append(cprocess)
def interupt_handler(signum, frame):
+ utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9)))
+ stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
stop_event.set()
- signal.signal(signal.SIGINT, interupt_handler)
+ signal.signal(signal.SIGINT , interupt_handler)
+ signal.signal(signal.SIGHUP , interupt_handler)
+ signal.signal(signal.SIGALRM, interupt_handler)
+ signal.signal(signal.SIGTERM, interupt_handler)
- log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues,stop_event,))
+ log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
log_thread.daemon = True
+ log_thread.start()
+
sprocess.start()
for cprocess in tweet_processes:
cprocess.start()
- log_thread.start()
+ add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name,sprocess.pid), 'consumers':dict([(p.name,p.pid) for p in tweet_processes])}, session_maker)
if options.duration >= 0:
end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
@@ -410,11 +427,13 @@
while not stop_event.is_set():
if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts:
+ stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
stop_event.set()
break
if sprocess.is_alive():
time.sleep(1)
else:
+ stop_args.update({'message': 'Source process killed'})
stop_event.set()
break
utils.get_logger().debug("Joining Source Process")
@@ -445,7 +464,7 @@
if options.process_nb > 1:
utils.get_logger().debug("Processing leftovers")
- session = Session()
+ session = session_maker()
try:
process_leftovers(session, access_token, utils.get_logger())
session.commit()
@@ -455,6 +474,61 @@
for pengine in process_engines:
pengine.dispose()
+
+ return stop_args
+
+
+if __name__ == '__main__':
+
+ (options, args) = get_options()
+
+ set_logging(options)
+
+ utils.get_logger().debug("OPTIONS : " + repr(options))
+
+ conn_str = options.conn_str.strip()
+ if not re.match("^\w+://.+", conn_str):
+ conn_str = 'sqlite:///' + options.conn_str
+ if conn_str.startswith("sqlite") and options.new:
+ filepath = conn_str[conn_str.find(":///") + 4:]
+ if os.path.exists(filepath):
+ i = 1
+ basename, extension = os.path.splitext(filepath)
+ new_path = '%s.%d%s' % (basename, i, extension)
+ while i < 1000000 and os.path.exists(new_path):
+ i += 1
+ new_path = '%s.%d%s' % (basename, i, extension)
+ if i >= 1000000:
+ raise Exception("Unable to find new filename for " + filepath)
+ else:
+ shutil.move(filepath, new_path)
+
+ Session, engine, metadata = get_sessionmaker(conn_str)
+
+ if options.new:
+ check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
+ if len(check_metadata.sorted_tables) > 0:
+ message = "Database %s not empty exiting" % conn_str
+ utils.get_logger().error(message)
+ sys.exit(message)
+
+ metadata.create_all(engine)
+ stop_args = {}
+ try:
+ add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
+ stop_args = do_run(options, Session)
+ except Exception as e:
+ utils.get_logger().exception("Error in main thread")
+ outfile = StringIO.StringIO()
+ try:
+ traceback.print_exc(file=outfile)
+ stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
+ finally:
+ outfile.close()
+ raise
+ finally:
+ add_process_event(type="shutdown", args=stop_args, session_maker=Session)
+
utils.get_logger().debug("Done. Exiting.")