--- a/script/lib/iri_tweet/models.py Thu Aug 25 02:22:19 2011 +0200
+++ b/script/lib/iri_tweet/models.py Thu Aug 25 18:23:53 2011 +0200
@@ -27,27 +27,34 @@
else:
return anyjson.serialize(obj)
-class TweetBase(object):
-
- def __init__(self, **kwargs):
- for key, value in kwargs.items():
- if hasattr(self, key):
- setattr(self, key, value)
+class TweetMeta(type(Base)):
+
+ def __init__(cls, name, bases, ns): #@NoSelf
+ def init(self, **kwargs):
+ for key, value in kwargs.items():
+ if hasattr(self, key):
+ setattr(self, key, value)
+ super(cls, self).__init__()
+ setattr(cls, '__init__', init)
+ super(TweetMeta, cls).__init__(name, bases, ns)
-class ProcessEvent(Base, TweetBase):
+class ProcessEvent(Base):
+ __metaclass__ = TweetMeta
__tablename__ = "tweet_process_event"
id = Column(Integer, primary_key=True, autoincrement=True)
- ts = Column(DateTime, default=datetime.datetime.utcnow(), index=True)
- type = Column(Enum("start","pid","shutdown","error", name="process_event_type_enum"), nullable=False)
+ ts = Column(DateTime, default=datetime.datetime.utcnow, index=True)
+ type = Column(Enum("start","pid","shutdown","error", "start_worker", "stop_worker", name="process_event_type_enum"), nullable=False)
args = Column(String)
-class EntityType(Base, TweetBase):
+class EntityType(Base):
+ __metaclass__ = TweetMeta
__tablename__ = "tweet_entity_type"
id = Column(Integer, primary_key=True, autoincrement=True)
label = Column(String)
-class Entity(Base, TweetBase):
+class Entity(Base):
+ __metaclass__ = TweetMeta
__tablename__ = "tweet_entity"
id = Column(Integer, primary_key=True)
tweet_id = Column(BigInteger, ForeignKey('tweet_tweet.id'))
@@ -60,24 +67,26 @@
__mapper_args__ = {'polymorphic_on': type, 'polymorphic_identity': 'entity_entity', 'with_polymorphic':'*'}
-class TweetSource(Base, TweetBase):
+class TweetSource(Base):
+ __metaclass__ = TweetMeta
__tablename__ = 'tweet_tweet_source'
id = Column(Integer, primary_key=True, autoincrement=True)
original_json = Column(String)
- received_at = Column(DateTime, default=datetime.datetime.utcnow(), index=True)
+ received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True)
-class TweetLog(Base, TweetBase):
-
+class TweetLog(Base):
+
TWEET_STATUS = {
'OK' : 1,
'ERROR' : 2,
'NOT_TWEET': 3,
}
+ __metaclass__ = TweetMeta
__tablename__ = 'tweet_tweet_log'
id = Column(Integer, primary_key=True, autoincrement=True)
- ts = Column(DateTime, default=datetime.datetime.utcnow(), index=True)
+ ts = Column(DateTime, default=datetime.datetime.utcnow, index=True)
tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
tweet_source = relationship("TweetSource", backref="logs")
status = Column(Integer)
@@ -85,7 +94,8 @@
error_stack = Column(String)
-class Tweet(Base, TweetBase):
+class Tweet(Base):
+ __metaclass__ = TweetMeta
__tablename__ = 'tweet_tweet'
id = Column(BigInteger, primary_key=True, autoincrement=False)
@@ -111,28 +121,31 @@
tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
tweet_source = relationship("TweetSource", backref="tweet")
entity_list = relationship(Entity, backref='tweet')
- received_at = Column(DateTime, default=datetime.datetime.utcnow(), index=True)
+ received_at = Column(DateTime, default=datetime.datetime.utcnow, index=True)
-class UserMessage(Base, TweetBase):
+class UserMessage(Base):
+ __metaclass__ = TweetMeta
__tablename__ = "tweet_user_message"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('tweet_user.id'))
user = relationship("User", backref="messages")
- created_at = Column(DateTime, default=datetime.datetime.utcnow())
+ created_at = Column(DateTime, default=datetime.datetime.utcnow)
message_id = Column(Integer, ForeignKey('tweet_message.id'))
-class Message(Base, TweetBase):
+class Message(Base):
+ __metaclass__ = TweetMeta
__tablename__ = "tweet_message"
id = Column(Integer, primary_key=True)
- created_at = Column(DateTime, default=datetime.datetime.utcnow())
+ created_at = Column(DateTime, default=datetime.datetime.utcnow)
text = Column(String)
users = relationship(UserMessage, backref='message')
-class User(Base, TweetBase):
+class User(Base):
+ __metaclass__ = TweetMeta
__tablename__ = "tweet_user"
id = Column(BigInteger, primary_key=True, autoincrement=False)
@@ -156,10 +169,12 @@
profile_background_image_url = Column(String)
profile_background_tile = Column(Boolean)
profile_image_url = Column(String)
+ profile_image_url_https = Column(String)
profile_link_color = Column(String)
profile_sidebar_border_color = Column(String)
profile_sidebar_fill_color = Column(String)
profile_text_color = Column(String)
+ default_profile_image = Column(String)
profile_use_background_image = Column(Boolean)
protected = Column(Boolean)
screen_name = Column(String, index=True)
@@ -171,27 +186,31 @@
verified = Column(Boolean)
-class Hashtag(Base, TweetBase):
+class Hashtag(Base):
+ __metaclass__ = TweetMeta
__tablename__ = "tweet_hashtag"
id = Column(Integer, primary_key=True)
text = Column(String, unique=True, index=True)
-class Url(Base, TweetBase):
+class Url(Base):
+ __metaclass__ = TweetMeta
__tablename__ = "tweet_url"
id = Column(Integer, primary_key=True)
url = Column(String, unique=True)
expanded_url = Column(String)
-class MediaType(Base, TweetBase):
+class MediaType(Base):
+ __metaclass__ = TweetMeta
__tablename__ = "tweet_media_type"
id = Column(Integer, primary_key=True, autoincrement=True)
label = Column(String, unique=True, index=True)
-class Media(Base, TweetBase):
+class Media(Base):
+ __metaclass__ = TweetMeta
__tablename__ = "tweet_media"
id = Column(BigInteger, primary_key=True, autoincrement=False)
id_str = Column(String, unique=True)
--- a/script/stream/recorder_tweetstream.py Thu Aug 25 02:22:19 2011 +0200
+++ b/script/stream/recorder_tweetstream.py Thu Aug 25 18:23:53 2011 +0200
@@ -111,11 +111,27 @@
# Don't listen to auth error, since we can't reasonably reconnect
# when we get one.
+def add_process_event(type, args, session_maker):
+ session = session_maker()
+ try:
+ evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
+ session.add(evt)
+ session.commit()
+ finally:
+ session.close()
+
class BaseProcess(Process):
- def __init__(self, parent_pid):
+ def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
self.parent_pid = parent_pid
+ self.session_maker = session_maker
+ self.queue = queue
+ self.options = options
+ self.logger_queue = logger_queue
+ self.stop_event = stop_event
+ self.access_token = access_token
+
super(BaseProcess, self).__init__()
#
@@ -131,23 +147,32 @@
else:
# *ring* Hi mom!
return True
+
+
+ def __get_process_event_args(self):
+ return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
+
+ def run(self):
+ try:
+ add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
+ self.do_run()
+ finally:
+ add_process_event("stop_worker",self.__get_process_event_args(), self.session_maker)
+
+ def do_run(self):
+ raise NotImplementedError()
+
class SourceProcess(BaseProcess):
def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
- self.session_maker = session_maker
- self.queue = queue
self.track = options.track
self.reconnects = options.reconnects
self.token_filename = options.token_filename
- self.stop_event = stop_event
- self.options = options
- self.access_token = access_token
- self.logger_queue = logger_queue
- super(SourceProcess, self).__init__(parent_pid)
+ super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
- def run(self):
+ def do_run(self):
#import pydevd
#pydevd.settrace(suspend=False)
@@ -218,11 +243,13 @@
processor.process()
except Exception as e:
message = u"Error %s processing tweet %s" % (repr(e), tweet)
- logger.error(message)
+ logger.exception(message)
output = StringIO.StringIO()
- traceback.print_exception(Exception, e, None, None, output)
- error_stack = output.getvalue()
- output.close()
+ try:
+ traceback.print_exc(file=output)
+ error_stack = output.getvalue()
+ finally:
+ output.close()
session.rollback()
tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
session.add(tweet_log)
@@ -233,23 +260,17 @@
class TweetProcess(BaseProcess):
def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
- self.session_maker = session_maker
- self.queue = queue
- self.stop_event = stop_event
- self.options = options
- self.access_token = access_token
- self.logger_queue = logger_queue
- super(TweetProcess, self).__init__(parent_pid)
+ super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
- def run(self):
+ def do_run(self):
self.logger = set_logging_process(self.options, self.logger_queue)
session = self.session_maker()
try:
while not self.stop_event.is_set() and self.parent_is_alive():
try:
- source_id, tweet_txt = queue.get(True, 3)
+ source_id, tweet_txt = self.queue.get(True, 3)
self.logger.debug("Processing source id " + repr(source_id))
except Exception as e:
self.logger.debug('Process tweet exception in loop : ' + repr(e))
@@ -322,20 +343,143 @@
utils.set_logging_options(parser)
return parser.parse_args()
+
+
+def do_run(options, session_maker):
+
+ stop_args = {}
+
+ access_token = None
+ if not options.username or not options.password:
+ access_token = utils.get_oauth_token(options.token_filename)
-def add_process_event(type, args, session_maker):
session = session_maker()
try:
- evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
- session.add(evt)
+ process_leftovers(session, access_token, utils.get_logger())
session.commit()
finally:
+ session.rollback()
session.close()
+
+ if options.process_nb <= 0:
+ utils.get_logger().debug("Leftovers processed. Exiting.")
+ return None
+
+ queue = mQueue()
+ stop_event = Event()
+
+ #workaround for bug on using urllib2 and multiprocessing
+ req = urllib2.Request('http://localhost')
+ conn = None
+ try:
+ conn = urllib2.urlopen(req)
+ except:
+ utils.get_logger().debug("could not open localhost")
+ #donothing
+ finally:
+ if conn is not None:
+ conn.close()
+
+ process_engines = []
+ logger_queues = []
+
+ SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+ process_engines.append(engine_process)
+ lqueue = mQueue(1)
+ logger_queues.append(lqueue)
+ pid = os.getpid()
+ sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
+
+ tweet_processes = []
+
+ for i in range(options.process_nb - 1):
+ SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+ process_engines.append(engine_process)
+ lqueue = mQueue(1)
+ logger_queues.append(lqueue)
+ cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
+ tweet_processes.append(cprocess)
+
+ def interupt_handler(signum, frame):
+ utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9)))
+ stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
+ stop_event.set()
+
+ signal.signal(signal.SIGINT , interupt_handler)
+ signal.signal(signal.SIGHUP , interupt_handler)
+ signal.signal(signal.SIGALRM, interupt_handler)
+ signal.signal(signal.SIGTERM, interupt_handler)
+
+ log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
+ log_thread.daemon = True
+
+ log_thread.start()
+
+ sprocess.start()
+ for cprocess in tweet_processes:
+ cprocess.start()
+
+ add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name,sprocess.pid), 'consumers':dict([(p.name,p.pid) for p in tweet_processes])}, session_maker)
+
+ if options.duration >= 0:
+ end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
+
+
+ while not stop_event.is_set():
+ if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts:
+ stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
+ stop_event.set()
+ break
+ if sprocess.is_alive():
+ time.sleep(1)
+ else:
+ stop_args.update({'message': 'Source process killed'})
+ stop_event.set()
+ break
+ utils.get_logger().debug("Joining Source Process")
+ try:
+ sprocess.join(10)
+ except:
+ utils.get_logger().debug("Pb joining Source Process - terminating")
+ sprocess.terminate()
+
+ for i, cprocess in enumerate(tweet_processes):
+ utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
+ try:
+ cprocess.join(3)
+ except:
+ utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
+ cprocess.terminate()
+
+
+ utils.get_logger().debug("Close queues")
+ try:
+ queue.close()
+ for lqueue in logger_queues:
+ lqueue.close()
+ except exception as e:
+ utils.get_logger().error("error when closing queues %s", repr(e))
+ #do nothing
+
+
+ if options.process_nb > 1:
+ utils.get_logger().debug("Processing leftovers")
+ session = session_maker()
+ try:
+ process_leftovers(session, access_token, utils.get_logger())
+ session.commit()
+ finally:
+ session.rollback()
+ session.close()
+
+ for pengine in process_engines:
+ pengine.dispose()
+
+ return stop_args
if __name__ == '__main__':
- stop_args = {}
(options, args) = get_options()
set_logging(options)
@@ -370,135 +514,21 @@
sys.exit(message)
metadata.create_all(engine)
-
- add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
-
- access_token = None
- if not options.username or not options.password:
- access_token = utils.get_oauth_token(options.token_filename)
-
- session = Session()
- try:
- process_leftovers(session, access_token, utils.get_logger())
- session.commit()
- finally:
- session.rollback()
- session.close()
-
- if options.process_nb <= 0:
- utils.get_logger().debug("Leftovers processed. Exiting.")
- add_process_event(type="shutdown", args=None, session_maker=Session)
- sys.exit()
-
- queue = mQueue()
- stop_event = Event()
-
- #workaround for bug on using urllib2 and multiprocessing
- req = urllib2.Request('http://localhost')
- conn = None
+ stop_args = {}
try:
- conn = urllib2.urlopen(req)
- except:
- pass
- #donothing
- finally:
- if conn is not None:
- conn.close()
-
- process_engines = []
- logger_queues = []
-
- SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
- process_engines.append(engine_process)
- lqueue = mQueue(1)
- logger_queues.append(lqueue)
- pid = os.getpid()
- sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
-
- tweet_processes = []
-
- for i in range(options.process_nb - 1):
- SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
- process_engines.append(engine_process)
- lqueue = mQueue(1)
- logger_queues.append(lqueue)
- cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
- tweet_processes.append(cprocess)
+ add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
+ stop_args = do_run(options, Session)
+ except Exception as e:
+ utils.get_logger().exception("Error in main thread")
+ outfile = StringIO.StringIO()
+ try:
+ traceback.print_exc(file=outfile)
+ stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
+ finally:
+ outfile.close()
+ raise
+ finally:
+ add_process_event(type="shutdown", args=stop_args, session_maker=Session)
- def interupt_handler(signum, frame):
- global stop_args
- utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9)))
- stop_args = {'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}
- stop_event.set()
-
- signal.signal(signal.SIGINT , interupt_handler)
- signal.signal(signal.SIGHUP , interupt_handler)
- signal.signal(signal.SIGALRM, interupt_handler)
- signal.signal(signal.SIGTERM, interupt_handler)
-
- log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
- log_thread.daemon = True
-
- log_thread.start()
-
- sprocess.start()
- for cprocess in tweet_processes:
- cprocess.start()
-
- add_process_event("pid", {"main":os.getpid(), 'source':sprocess.pid, 'consumers':[p.pid for p in tweet_processes]}, Session)
-
- if options.duration >= 0:
- end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
-
-
- while not stop_event.is_set():
- if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts:
- stop_event.set()
- break
- if sprocess.is_alive():
- time.sleep(1)
- else:
- stop_event.set()
- break
- utils.get_logger().debug("Joining Source Process")
- try:
- sprocess.join(10)
- except:
- utils.get_logger().debug("Pb joining Source Process - terminating")
- sprocess.terminate()
-
- for i, cprocess in enumerate(tweet_processes):
- utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
- try:
- cprocess.join(3)
- except:
- utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
- cprocess.terminate()
-
-
- utils.get_logger().debug("Close queues")
- try:
- queue.close()
- for lqueue in logger_queues:
- lqueue.close()
- except exception as e:
- utils.get_logger().error("error when closing queues %s", repr(e))
- #do nothing
-
-
- if options.process_nb > 1:
- utils.get_logger().debug("Processing leftovers")
- session = Session()
- try:
- process_leftovers(session, access_token, utils.get_logger())
- session.commit()
- finally:
- session.rollback()
- session.close()
-
- for pengine in process_engines:
- pengine.dispose()
-
- add_process_event(type="shutdown", args=stop_args, session_maker=Session)
utils.get_logger().debug("Done. Exiting.")