--- a/script/lib/iri_tweet/models.py Wed Aug 24 18:04:26 2011 +0200
+++ b/script/lib/iri_tweet/models.py Thu Aug 25 02:20:08 2011 +0200
@@ -1,5 +1,5 @@
-from sqlalchemy import (Boolean, Column, BigInteger, Integer, String, ForeignKey,
- DateTime, create_engine)
+from sqlalchemy import (Boolean, Column, Enum, BigInteger, Integer, String,
+ ForeignKey, DateTime, create_engine)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
import anyjson
@@ -27,12 +27,27 @@
else:
return anyjson.serialize(obj)
-class EntityType(Base):
+class TweetBase(object):
+
+ def __init__(self, **kwargs):
+ for key, value in kwargs.items():
+ if hasattr(self, key):
+ setattr(self, key, value)
+
+
+class ProcessEvent(Base, TweetBase):
+ __tablename__ = "tweet_process_event"
+ id = Column(Integer, primary_key=True, autoincrement=True)
+ ts = Column(DateTime, default=datetime.datetime.utcnow(), index=True)
+ type = Column(Enum("start","pid","shutdown","error", name="process_event_type_enum"), nullable=False)
+ args = Column(String)
+
+class EntityType(Base, TweetBase):
__tablename__ = "tweet_entity_type"
id = Column(Integer, primary_key=True, autoincrement=True)
label = Column(String)
-class Entity(Base):
+class Entity(Base, TweetBase):
__tablename__ = "tweet_entity"
id = Column(Integer, primary_key=True)
tweet_id = Column(BigInteger, ForeignKey('tweet_tweet.id'))
@@ -44,24 +59,15 @@
source = Column(String)
__mapper_args__ = {'polymorphic_on': type, 'polymorphic_identity': 'entity_entity', 'with_polymorphic':'*'}
- def __init__(self, **kwargs):
- for key, value in kwargs.items():
- if hasattr(self, key):
- setattr(self, key, value)
-class TweetSource(Base):
+class TweetSource(Base, TweetBase):
__tablename__ = 'tweet_tweet_source'
id = Column(Integer, primary_key=True, autoincrement=True)
original_json = Column(String)
- received_at = Column(DateTime, default=datetime.datetime.now(), index=True)
-
- def __init__(self, **kwargs):
- for key, value in kwargs.items():
- if hasattr(self, key):
- setattr(self, key, value)
+ received_at = Column(DateTime, default=datetime.datetime.utcnow(), index=True)
-class TweetLog(Base):
+class TweetLog(Base, TweetBase):
TWEET_STATUS = {
'OK' : 1,
@@ -71,6 +77,7 @@
__tablename__ = 'tweet_tweet_log'
id = Column(Integer, primary_key=True, autoincrement=True)
+ ts = Column(DateTime, default=datetime.datetime.utcnow(), index=True)
tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
tweet_source = relationship("TweetSource", backref="logs")
status = Column(Integer)
@@ -78,7 +85,7 @@
error_stack = Column(String)
-class Tweet(Base):
+class Tweet(Base, TweetBase):
__tablename__ = 'tweet_tweet'
id = Column(BigInteger, primary_key=True, autoincrement=False)
@@ -104,32 +111,28 @@
tweet_source_id = Column(Integer, ForeignKey('tweet_tweet_source.id'))
tweet_source = relationship("TweetSource", backref="tweet")
entity_list = relationship(Entity, backref='tweet')
- received_at = Column(DateTime, default=datetime.datetime.now(), index=True)
+ received_at = Column(DateTime, default=datetime.datetime.utcnow(), index=True)
- def __init__(self, **kwargs):
- for key, value in kwargs.items():
- if hasattr(self, key):
- setattr(self, key, value)
-class UserMessage(Base):
+class UserMessage(Base, TweetBase):
__tablename__ = "tweet_user_message"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('tweet_user.id'))
user = relationship("User", backref="messages")
- created_at = Column(DateTime, default=datetime.datetime.now())
+ created_at = Column(DateTime, default=datetime.datetime.utcnow())
message_id = Column(Integer, ForeignKey('tweet_message.id'))
-class Message(Base):
+class Message(Base, TweetBase):
__tablename__ = "tweet_message"
id = Column(Integer, primary_key=True)
- created_at = Column(DateTime, default=datetime.datetime.now())
+ created_at = Column(DateTime, default=datetime.datetime.utcnow())
text = Column(String)
users = relationship(UserMessage, backref='message')
-class User(Base):
+class User(Base, TweetBase):
__tablename__ = "tweet_user"
id = Column(BigInteger, primary_key=True, autoincrement=False)
@@ -166,43 +169,29 @@
url = Column(String)
utc_offset = Column(Integer)
verified = Column(Boolean)
-
- def __init__(self, **kwargs):
- for key, value in kwargs.items():
- if hasattr(self, key):
- setattr(self, key, value)
-class Hashtag(Base):
+class Hashtag(Base, TweetBase):
__tablename__ = "tweet_hashtag"
id = Column(Integer, primary_key=True)
text = Column(String, unique=True, index=True)
- def __init__(self, **kwargs):
- for key, value in kwargs.items():
- if hasattr(self, key):
- setattr(self, key, value)
+
-class Url(Base):
+class Url(Base, TweetBase):
__tablename__ = "tweet_url"
id = Column(Integer, primary_key=True)
url = Column(String, unique=True)
expanded_url = Column(String)
- def __init__(self, **kwargs):
- for key, value in kwargs.items():
- if hasattr(self, key):
- setattr(self, key, value)
+
-class MediaType(Base):
+class MediaType(Base, TweetBase):
__tablename__ = "tweet_media_type"
id = Column(Integer, primary_key=True, autoincrement=True)
label = Column(String, unique=True, index=True)
- def __init__(self, **kwargs):
- for key, value in kwargs.items():
- if hasattr(self, key):
- setattr(self, key, value)
+
-class Media(Base):
+class Media(Base, TweetBase):
__tablename__ = "tweet_media"
id = Column(BigInteger, primary_key=True, autoincrement=False)
id_str = Column(String, unique=True)
@@ -214,10 +203,6 @@
sizes = Column(String)
type_id = Column(Integer, ForeignKey("tweet_media_type.id"))
type = relationship(MediaType, primaryjoin=type_id == MediaType.id)
- def __init__(self, **kwargs):
- for key, value in kwargs.items():
- if hasattr(self, key):
- setattr(self, key, value)
@@ -227,11 +212,6 @@
id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
hashtag_id = Column(Integer, ForeignKey("tweet_hashtag.id"))
hashtag = relationship(Hashtag, primaryjoin=hashtag_id == Hashtag.id)
- def __init__(self, **kwargs):
- super(EntityHashtag, self).__init__(**kwargs)
- for key, value in kwargs.items():
- if hasattr(self, key):
- setattr(self, key, value)
class EntityUrl(Entity):
@@ -240,11 +220,6 @@
id = Column(Integer, ForeignKey('tweet_entity.id'), primary_key=True)
url_id = Column(Integer, ForeignKey("tweet_url.id"))
url = relationship(Url, primaryjoin=url_id == Url.id)
- def __init__(self, **kwargs):
- super(EntityUrl, self).__init__(**kwargs)
- for key, value in kwargs.items():
- if hasattr(self, key):
- setattr(self, key, value)
class EntityUser(Entity):
__tablename__ = "tweet_entity_user"
@@ -253,11 +228,6 @@
user_id = Column(BigInteger, ForeignKey('tweet_user.id'))
user = relationship(User, primaryjoin=(user_id == User.id))
- def __init__(self, **kwargs):
- super(EntityUser, self).__init__(**kwargs)
- for key, value in kwargs.items():
- if hasattr(self, key):
- setattr(self, key, value)
class EntityMedia(Entity):
__tablename__ = "tweet_entity_media"
@@ -266,12 +236,6 @@
media_id = Column(BigInteger, ForeignKey('tweet_media.id'))
media = relationship(Media, primaryjoin=(media_id == Media.id))
- def __init__(self, **kwargs):
- super(EntityMedia, self).__init__(**kwargs)
- for key, value in kwargs.items():
- if hasattr(self, key):
- setattr(self, key, value)
-
def setup_database(*args, **kwargs):
@@ -288,118 +252,3 @@
return (engine, metadata)
-rest_tweet_tweet = {
- u'iso_language_code': 'unicode',
- u'text': 'unicode',
- u'from_user_id_str': 'unicode',
- u'profile_image_url': 'unicode',
- u'to_user_id_str': 'NoneType',
- u'created_at': 'unicode',
- u'source': 'unicode',
- u'to_user': 'unicode',
- u'id_str': 'unicode',
- u'from_user': 'unicode',
- u'place': {u'type': 'unicode', u'id': 'unicode', u'full_name': 'unicode'},
- u'from_user_id': 'int',
- u'to_user_id': 'NoneType',
- u'geo': 'NoneType',
- u'id': 'int',
- u'metadata': {u'result_type': 'unicode'}
-}
-
-tweet_tweet = {
- 'contributors': None,
- 'coordinates': None,
- 'created_at': 'date',
- 'entities': "tweet_entity",
- 'favorited': "bool",
- 'geo': None,
- 'id': "long",
- 'id_str': "string",
- 'in_reply_to_screen_name': "string",
- 'in_reply_to_status_id': "long",
- 'in_reply_to_status_id_str': "string",
- 'in_reply_to_user_id': "int",
- 'in_reply_to_user_id_str': "string",
- 'place': "string",
- 'retweet_count': "int",
- 'retweeted': "bool",
- 'source': "string",
- 'text': "string",
- 'truncated': "bool",
- 'user': "tweet_user"
-}
-tweet_user = {
- 'contributors_enabled': 'bool',
- 'created_at': 'str',
- 'description': 'str',
- 'favourites_count': 'int',
- 'follow_request_sent': None,
- 'followers_count': 'int',
- 'following': None,
- 'friends_count': 'int',
- 'geo_enabled': 'bool',
- 'id': 'int',
- 'id_str': 'str',
- 'is_translator': 'bool',
- 'lang': 'str',
- 'listed_count': 'int',
- 'location': 'str',
- 'name': 'str',
- 'notifications': 'NoneType',
- 'profile_background_color': 'str',
- 'profile_background_image_url': 'str',
- 'profile_background_tile': 'bool',
- 'profile_image_url': 'str',
- 'profile_link_color': 'str',
- 'profile_sidebar_border_color': 'str',
- 'profile_sidebar_fill_color': 'str',
- 'profile_text_color': 'str',
- 'profile_use_background_image': 'bool',
- 'protected': 'bool',
- 'screen_name': 'str',
- 'show_all_inline_media': 'bool',
- 'statuses_count': 'int',
- 'time_zone': 'str',
- 'url': 'str',
- 'utc_offset': 'int',
- 'verified': 'bool',
-}
-
-
-tweet_entity_hashtag = {
- 'hashtag' : 'tweet_hashtag',
- 'indice_start' : 'int',
- 'indice_end' : 'int',
- 'tweet':'tweet_tweet'
-}
-
-tweet_entity_url = {
- 'url' : 'tweet_url',
- 'indice_start' : 'int',
- 'indice_end' : 'int',
- 'tweet':'tweet_tweet'
-}
-
-tweet_entity_user = {
- 'user' : 'tweet_user',
- 'indice_start' : 'int',
- 'indice_end' : 'int',
- 'tweet':'tweet_tweet'
-}
-
-#id int
-#id_str str
-#indices list
-#name str
-#screen_name str
-
-tweet_hashtag = {
- "text": "string"
-}
-
-tweet_url = {
- "url": "string",
- "expanded_url" : "string",
-}
-
--- a/script/stream/recorder_tweetstream.py Wed Aug 24 18:04:26 2011 +0200
+++ b/script/stream/recorder_tweetstream.py Thu Aug 25 02:20:08 2011 +0200
@@ -1,6 +1,6 @@
from getpass import getpass
from iri_tweet import models, utils
-from iri_tweet.models import TweetSource, TweetLog
+from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event,
get_logger)
from optparse import OptionParser
@@ -10,6 +10,7 @@
import StringIO
import anyjson
import datetime
+import inspect
import logging
import os
import re
@@ -111,11 +112,30 @@
# when we get one.
+class BaseProcess(Process):
+
+ def __init__(self, parent_pid):
+ self.parent_pid = parent_pid
+ super(BaseProcess, self).__init__()
+
+ #
+ # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
+ #
+ def parent_is_alive(self):
+ try:
+ # try to call Parent
+ os.kill(self.parent_pid, 0)
+ except OSError:
+ # *beeep* oh no! The phone's disconnected!
+ return False
+ else:
+ # *ring* Hi mom!
+ return True
-class SourceProcess(Process):
+class SourceProcess(BaseProcess):
- def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
+ def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
self.session_maker = session_maker
self.queue = queue
self.track = options.track
@@ -125,8 +145,8 @@
self.options = options
self.access_token = access_token
self.logger_queue = logger_queue
- super(SourceProcess, self).__init__()
-
+ super(SourceProcess, self).__init__(parent_pid)
+
def run(self):
#import pydevd
@@ -148,9 +168,11 @@
try:
for tweet in stream:
- self.logger.debug("tweet " + repr(tweet))
+ if not self.parent_is_alive():
+ sys.exit()
+ self.logger.debug("SourceProcess : tweet " + repr(tweet))
source = TweetSource(original_json=tweet)
- self.logger.debug("source created")
+ self.logger.debug("SourceProcess : source created")
add_retries = 0
while add_retries < 10:
try:
@@ -160,18 +182,18 @@
break
except OperationalError as e:
session.rollback()
- self.logger.debug("Operational Error %s nb %d" % (repr(e), add_retries))
+ self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
if add_retries == 10:
raise e
source_id = source.id
- self.logger.debug("before queue + source id " + repr(source_id))
- self.logger.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+ self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
+ self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
session.commit()
self.queue.put((source_id, tweet), False)
except Exception as e:
- self.logger.error("Error when processing tweet " + repr(e))
+ self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
finally:
session.rollback()
stream.close()
@@ -208,16 +230,16 @@
-class TweetProcess(Process):
+class TweetProcess(BaseProcess):
- def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
+ def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
self.session_maker = session_maker
self.queue = queue
self.stop_event = stop_event
self.options = options
self.access_token = access_token
self.logger_queue = logger_queue
- super(TweetProcess, self).__init__()
+ super(TweetProcess, self).__init__(parent_pid)
def run(self):
@@ -225,7 +247,7 @@
self.logger = set_logging_process(self.options, self.logger_queue)
session = self.session_maker()
try:
- while not self.stop_event.is_set():
+ while not self.stop_event.is_set() and self.parent_is_alive():
try:
source_id, tweet_txt = queue.get(True, 3)
self.logger.debug("Processing source id " + repr(source_id))
@@ -273,7 +295,11 @@
def get_options():
- parser = OptionParser()
+
+ usage = "usage: %prog [options]"
+
+ parser = OptionParser(usage=usage)
+
parser.add_option("-f", "--file", dest="conn_str",
help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
parser.add_option("-u", "--user", dest="username",
@@ -293,30 +319,35 @@
parser.add_option("-N", "--nb-process", dest="process_nb",
help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
-
-
utils.set_logging_options(parser)
return parser.parse_args()
+def add_process_event(type, args, session_maker):
+ session = session_maker()
+ try:
+ evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
+ session.add(evt)
+ session.commit()
+ finally:
+ session.close()
+
if __name__ == '__main__':
+ stop_args = {}
(options, args) = get_options()
set_logging(options)
-
- if options.debug:
- print "OPTIONS : "
- print repr(options)
+ utils.get_logger().debug("OPTIONS : " + repr(options))
conn_str = options.conn_str.strip()
if not re.match("^\w+://.+", conn_str):
conn_str = 'sqlite:///' + options.conn_str
if conn_str.startswith("sqlite") and options.new:
- filepath = conn_str[conn_str.find(":///")+4:]
+ filepath = conn_str[conn_str.find(":///") + 4:]
if os.path.exists(filepath):
i = 1
basename, extension = os.path.splitext(filepath)
@@ -340,6 +371,8 @@
metadata.create_all(engine)
+ add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
+
access_token = None
if not options.username or not options.password:
access_token = utils.get_oauth_token(options.token_filename)
@@ -354,6 +387,7 @@
if options.process_nb <= 0:
utils.get_logger().debug("Leftovers processed. Exiting.")
+ add_process_event(type="shutdown", args=None, session_maker=Session)
sys.exit()
queue = mQueue()
@@ -378,7 +412,8 @@
process_engines.append(engine_process)
lqueue = mQueue(1)
logger_queues.append(lqueue)
- sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)
+ pid = os.getpid()
+ sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
tweet_processes = []
@@ -387,11 +422,13 @@
process_engines.append(engine_process)
lqueue = mQueue(1)
logger_queues.append(lqueue)
- cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)
+ cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
tweet_processes.append(cprocess)
def interupt_handler(signum, frame):
- utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(frame))
+ global stop_args
+ utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9)))
+ stop_args = {'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}
stop_event.set()
signal.signal(signal.SIGINT , interupt_handler)
@@ -399,14 +436,16 @@
signal.signal(signal.SIGALRM, interupt_handler)
signal.signal(signal.SIGTERM, interupt_handler)
- log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues,stop_event,))
+ log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
log_thread.daemon = True
+ log_thread.start()
+
sprocess.start()
for cprocess in tweet_processes:
cprocess.start()
- log_thread.start()
+ add_process_event("pid", {"main":os.getpid(), 'source':sprocess.pid, 'consumers':[p.pid for p in tweet_processes]}, Session)
if options.duration >= 0:
end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
@@ -460,5 +499,6 @@
for pengine in process_engines:
pengine.dispose()
+ add_process_event(type="shutdown", args=stop_args, session_maker=Session)
utils.get_logger().debug("Done. Exiting.")