# HG changeset patch # User Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com> # Date 1314289433 -7200 # Node ID 6671e9a4c9c5a36d769f0b09c1e6bcc229c473d0 # Parent 33cf0231e253b7a9d79c499ac141515016748703 correct model ans improve event tracking diff -r 33cf0231e253 -r 6671e9a4c9c5 script/lib/iri_tweet/models.py --- a/script/lib/iri_tweet/models.py Thu Aug 25 02:22:19 2011 +0200 +++ b/script/lib/iri_tweet/models.py Thu Aug 25 18:23:53 2011 +0200 @@ -27,27 +27,34 @@ else: return anyjson.serialize(obj) -class TweetBase(object): - - def __init__(self, **kwargs): - for key, value in kwargs.items(): - if hasattr(self, key): - setattr(self, key, value) +class TweetMeta(type(Base)): + + def __init__(cls, name, bases, ns): #@NoSelf + def init(self, **kwargs): + for key, value in kwargs.items(): + if hasattr(self, key): + setattr(self, key, value) + super(cls, self).__init__() + setattr(cls, '__init__', init) + super(TweetMeta, cls).__init__(name, bases, ns) -class ProcessEvent(Base, TweetBase): +class ProcessEvent(Base): + __metaclass__ = TweetMeta __tablename__ = "tweet_process_event" id = Column(Integer, primary_key=True, autoincrement=True) - ts = Column(DateTime, default=datetime.datetime.utcnow(), index=True) - type = Column(Enum("start","pid","shutdown","error", name="process_event_type_enum"), nullable=False) + ts = Column(DateTime, default=datetime.datetime.utcnow, index=True) + type = Column(Enum("start","pid","shutdown","error", "start_worker", "stop_worker", name="process_event_type_enum"), nullable=False) args = Column(String) -class EntityType(Base, TweetBase): +class EntityType(Base): + __metaclass__ = TweetMeta __tablename__ = "tweet_entity_type" id = Column(Integer, primary_key=True, autoincrement=True) label = Column(String) -class Entity(Base, TweetBase): +class Entity(Base): + __metaclass__ = TweetMeta __tablename__ = "tweet_entity" id = Column(Integer, primary_key=True) tweet_id = Column(BigInteger, ForeignKey('tweet_tweet.id')) @@ -60,24 +67,26 @@ __mapper_args__ = {'polymorphic_on': type, 'polymorphic_identity': 'entity_entity', 'with_polymorphic':'*'} -class TweetSource(Base, TweetBase): +class TweetSource(Base): + __metaclass__ = TweetMeta __tablename__ = 'tweet_tweet_source' id = Column(Integer, primary_key=True, autoincrement=True) original_json = Column(String) - received_at = Column(DateTime, default=datetime.datetime.utcnow(), index=True) + received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True) -class TweetLog(Base, TweetBase): - +class TweetLog(Base): + TWEET_STATUS = { 'OK' : 1, 'ERROR' : 2, 'NOT_TWEET': 3, } + __metaclass__ = TweetMeta __tablename__ = 'tweet_tweet_log' id = Column(Integer, primary_key=True, autoincrement=True) - ts = Column(DateTime, default=datetime.datetime.utcnow(), index=True) + ts = Column(DateTime, default=datetime.datetime.utcnow, index=True) tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id')) tweet_source = relationship("TweetSource", backref="logs") status = Column(Integer) @@ -85,7 +94,8 @@ error_stack = Column(String) -class Tweet(Base, TweetBase): +class Tweet(Base): + __metaclass__ = TweetMeta __tablename__ = 'tweet_tweet' id = Column(BigInteger, primary_key=True, autoincrement=False) @@ -111,28 +121,31 @@ tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id')) tweet_source = relationship("TweetSource", backref="tweet") entity_list = relationship(Entity, backref='tweet') - received_at = Column(DateTime, default=datetime.datetime.utcnow(), index=True) + received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True) -class UserMessage(Base, TweetBase): +class UserMessage(Base): + __metaclass__ = TweetMeta __tablename__ = "tweet_user_message" id = Column(Integer, primary_key=True) user_id = Column(Integer, ForeignKey('tweet_user.id')) user = relationship("User", backref="messages") - created_at = Column(DateTime, default=datetime.datetime.utcnow()) + created_at = Column(DateTime, default=datetime.datetime.utcnow) message_id = Column(Integer, ForeignKey('tweet_message.id')) -class Message(Base, TweetBase): +class Message(Base): + __metaclass__ = TweetMeta __tablename__ = "tweet_message" id = Column(Integer, primary_key=True) - created_at = Column(DateTime, default=datetime.datetime.utcnow()) + created_at = Column(DateTime, default=datetime.datetime.utcnow) text = Column(String) users = relationship(UserMessage, backref='message') -class User(Base, TweetBase): +class User(Base): + __metaclass__ = TweetMeta __tablename__ = "tweet_user" id = Column(BigInteger, primary_key=True, autoincrement=False) @@ -156,10 +169,12 @@ profile_background_image_url = Column(String) profile_background_tile = Column(Boolean) profile_image_url = Column(String) + profile_image_url_https = Column(String) profile_link_color = Column(String) profile_sidebar_border_color = Column(String) profile_sidebar_fill_color = Column(String) profile_text_color = Column(String) + default_profile_image = Column(String) profile_use_background_image = Column(Boolean) protected = Column(Boolean) screen_name = Column(String, index=True) @@ -171,27 +186,31 @@ verified = Column(Boolean) -class Hashtag(Base, TweetBase): +class Hashtag(Base): + __metaclass__ = TweetMeta __tablename__ = "tweet_hashtag" id = Column(Integer, primary_key=True) text = Column(String, unique=True, index=True) -class Url(Base, TweetBase): +class Url(Base): + __metaclass__ = TweetMeta __tablename__ = "tweet_url" id = Column(Integer, primary_key=True) url = Column(String, unique=True) expanded_url = Column(String) -class MediaType(Base, TweetBase): +class MediaType(Base): + __metaclass__ = TweetMeta __tablename__ = "tweet_media_type" id = Column(Integer, primary_key=True, autoincrement=True) label = Column(String, unique=True, index=True) -class Media(Base, TweetBase): +class Media(Base): + __metaclass__ = TweetMeta __tablename__ = "tweet_media" id = Column(BigInteger, primary_key=True, autoincrement=False) id_str = Column(String, unique=True) diff -r 33cf0231e253 -r 6671e9a4c9c5 script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Thu Aug 25 02:22:19 2011 +0200 +++ b/script/stream/recorder_tweetstream.py Thu Aug 25 18:23:53 2011 +0200 @@ -111,11 +111,27 @@ # Don't listen to auth error, since we can't reasonably reconnect # when we get one. +def add_process_event(type, args, session_maker): + session = session_maker() + try: + evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type) + session.add(evt) + session.commit() + finally: + session.close() + class BaseProcess(Process): - def __init__(self, parent_pid): + def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): self.parent_pid = parent_pid + self.session_maker = session_maker + self.queue = queue + self.options = options + self.logger_queue = logger_queue + self.stop_event = stop_event + self.access_token = access_token + super(BaseProcess, self).__init__() # @@ -131,23 +147,32 @@ else: # *ring* Hi mom! return True + + + def __get_process_event_args(self): + return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token} + + def run(self): + try: + add_process_event("start_worker", self.__get_process_event_args(), self.session_maker) + self.do_run() + finally: + add_process_event("stop_worker",self.__get_process_event_args(), self.session_maker) + + def do_run(self): + raise NotImplementedError() + class SourceProcess(BaseProcess): def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): - self.session_maker = session_maker - self.queue = queue self.track = options.track self.reconnects = options.reconnects self.token_filename = options.token_filename - self.stop_event = stop_event - self.options = options - self.access_token = access_token - self.logger_queue = logger_queue - super(SourceProcess, self).__init__(parent_pid) + super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) - def run(self): + def do_run(self): #import pydevd #pydevd.settrace(suspend=False) @@ -218,11 +243,13 @@ processor.process() except Exception as e: message = u"Error %s processing tweet %s" % (repr(e), tweet) - logger.error(message) + logger.exception(message) output = StringIO.StringIO() - traceback.print_exception(Exception, e, None, None, output) - error_stack = output.getvalue() - output.close() + try: + traceback.print_exc(file=output) + error_stack = output.getvalue() + finally: + output.close() session.rollback() tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack) session.add(tweet_log) @@ -233,23 +260,17 @@ class TweetProcess(BaseProcess): def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): - self.session_maker = session_maker - self.queue = queue - self.stop_event = stop_event - self.options = options - self.access_token = access_token - self.logger_queue = logger_queue - super(TweetProcess, self).__init__(parent_pid) + super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) - def run(self): + def do_run(self): self.logger = set_logging_process(self.options, self.logger_queue) session = self.session_maker() try: while not self.stop_event.is_set() and self.parent_is_alive(): try: - source_id, tweet_txt = queue.get(True, 3) + source_id, tweet_txt = self.queue.get(True, 3) self.logger.debug("Processing source id " + repr(source_id)) except Exception as e: self.logger.debug('Process tweet exception in loop : ' + repr(e)) @@ -322,20 +343,143 @@ utils.set_logging_options(parser) return parser.parse_args() + + +def do_run(options, session_maker): + + stop_args = {} + + access_token = None + if not options.username or not options.password: + access_token = utils.get_oauth_token(options.token_filename) -def add_process_event(type, args, session_maker): session = session_maker() try: - evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type) - session.add(evt) + process_leftovers(session, access_token, utils.get_logger()) session.commit() finally: + session.rollback() session.close() + + if options.process_nb <= 0: + utils.get_logger().debug("Leftovers processed. Exiting.") + return None + + queue = mQueue() + stop_event = Event() + + #workaround for bug on using urllib2 and multiprocessing + req = urllib2.Request('http://localhost') + conn = None + try: + conn = urllib2.urlopen(req) + except: + utils.get_logger().debug("could not open localhost") + #donothing + finally: + if conn is not None: + conn.close() + + process_engines = [] + logger_queues = [] + + SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) + process_engines.append(engine_process) + lqueue = mQueue(1) + logger_queues.append(lqueue) + pid = os.getpid() + sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) + + tweet_processes = [] + + for i in range(options.process_nb - 1): + SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) + process_engines.append(engine_process) + lqueue = mQueue(1) + logger_queues.append(lqueue) + cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) + tweet_processes.append(cprocess) + + def interupt_handler(signum, frame): + utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) + stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}) + stop_event.set() + + signal.signal(signal.SIGINT , interupt_handler) + signal.signal(signal.SIGHUP , interupt_handler) + signal.signal(signal.SIGALRM, interupt_handler) + signal.signal(signal.SIGTERM, interupt_handler) + + log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) + log_thread.daemon = True + + log_thread.start() + + sprocess.start() + for cprocess in tweet_processes: + cprocess.start() + + add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name,sprocess.pid), 'consumers':dict([(p.name,p.pid) for p in tweet_processes])}, session_maker) + + if options.duration >= 0: + end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) + + + while not stop_event.is_set(): + if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: + stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts}) + stop_event.set() + break + if sprocess.is_alive(): + time.sleep(1) + else: + stop_args.update({'message': 'Source process killed'}) + stop_event.set() + break + utils.get_logger().debug("Joining Source Process") + try: + sprocess.join(10) + except: + utils.get_logger().debug("Pb joining Source Process - terminating") + sprocess.terminate() + + for i, cprocess in enumerate(tweet_processes): + utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) + try: + cprocess.join(3) + except: + utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) + cprocess.terminate() + + + utils.get_logger().debug("Close queues") + try: + queue.close() + for lqueue in logger_queues: + lqueue.close() + except exception as e: + utils.get_logger().error("error when closing queues %s", repr(e)) + #do nothing + + + if options.process_nb > 1: + utils.get_logger().debug("Processing leftovers") + session = session_maker() + try: + process_leftovers(session, access_token, utils.get_logger()) + session.commit() + finally: + session.rollback() + session.close() + + for pengine in process_engines: + pengine.dispose() + + return stop_args if __name__ == '__main__': - stop_args = {} (options, args) = get_options() set_logging(options) @@ -370,135 +514,21 @@ sys.exit(message) metadata.create_all(engine) - - add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session) - - access_token = None - if not options.username or not options.password: - access_token = utils.get_oauth_token(options.token_filename) - - session = Session() - try: - process_leftovers(session, access_token, utils.get_logger()) - session.commit() - finally: - session.rollback() - session.close() - - if options.process_nb <= 0: - utils.get_logger().debug("Leftovers processed. Exiting.") - add_process_event(type="shutdown", args=None, session_maker=Session) - sys.exit() - - queue = mQueue() - stop_event = Event() - - #workaround for bug on using urllib2 and multiprocessing - req = urllib2.Request('http://localhost') - conn = None + stop_args = {} try: - conn = urllib2.urlopen(req) - except: - pass - #donothing - finally: - if conn is not None: - conn.close() - - process_engines = [] - logger_queues = [] - - SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) - process_engines.append(engine_process) - lqueue = mQueue(1) - logger_queues.append(lqueue) - pid = os.getpid() - sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) - - tweet_processes = [] - - for i in range(options.process_nb - 1): - SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) - process_engines.append(engine_process) - lqueue = mQueue(1) - logger_queues.append(lqueue) - cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) - tweet_processes.append(cprocess) + add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session) + stop_args = do_run(options, Session) + except Exception as e: + utils.get_logger().exception("Error in main thread") + outfile = StringIO.StringIO() + try: + traceback.print_exc(file=outfile) + stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()} + finally: + outfile.close() + raise + finally: + add_process_event(type="shutdown", args=stop_args, session_maker=Session) - def interupt_handler(signum, frame): - global stop_args - utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) - stop_args = {'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)} - stop_event.set() - - signal.signal(signal.SIGINT , interupt_handler) - signal.signal(signal.SIGHUP , interupt_handler) - signal.signal(signal.SIGALRM, interupt_handler) - signal.signal(signal.SIGTERM, interupt_handler) - - log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) - log_thread.daemon = True - - log_thread.start() - - sprocess.start() - for cprocess in tweet_processes: - cprocess.start() - - add_process_event("pid", {"main":os.getpid(), 'source':sprocess.pid, 'consumers':[p.pid for p in tweet_processes]}, Session) - - if options.duration >= 0: - end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) - - - while not stop_event.is_set(): - if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: - stop_event.set() - break - if sprocess.is_alive(): - time.sleep(1) - else: - stop_event.set() - break - utils.get_logger().debug("Joining Source Process") - try: - sprocess.join(10) - except: - utils.get_logger().debug("Pb joining Source Process - terminating") - sprocess.terminate() - - for i, cprocess in enumerate(tweet_processes): - utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) - try: - cprocess.join(3) - except: - utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) - cprocess.terminate() - - - utils.get_logger().debug("Close queues") - try: - queue.close() - for lqueue in logger_queues: - lqueue.close() - except exception as e: - utils.get_logger().error("error when closing queues %s", repr(e)) - #do nothing - - - if options.process_nb > 1: - utils.get_logger().debug("Processing leftovers") - session = Session() - try: - process_leftovers(session, access_token, utils.get_logger()) - session.commit() - finally: - session.rollback() - session.close() - - for pengine in process_engines: - pengine.dispose() - - add_process_event(type="shutdown", args=stop_args, session_maker=Session) utils.get_logger().debug("Done. Exiting.")