diff -r 41ce1c341abe -r 10a19dd4e1c9 script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Tue May 07 18:28:26 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,528 +0,0 @@ -from getpass import getpass -from iri_tweet import models, utils -from iri_tweet.models import TweetSource, TweetLog, ProcessEvent -from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, - get_logger) -from optparse import OptionParser -from sqlalchemy.exc import OperationalError -from sqlalchemy.orm import scoped_session -import Queue -import StringIO -import anyjson -import datetime -import inspect -import logging -import os -import re -import shutil -import signal -import socket -import sqlalchemy.schema -import sys -import threading -import time -import traceback -import tweepy.auth -import tweetstream -import urllib2 -socket._fileobject.default_bufsize = 0 - - - -#columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user'] -columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source'] -#columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following'] -columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count'] -#just put it in a sqlite3 tqble - - -def set_logging(options): - loggers = [] - - loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet'))) - loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing'))) - if options.debug >= 2: - loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine'))) - #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects')) - #utils.set_logging(options, logging.getLogger('sqlalchemy.pool')) - #utils.set_logging(options, logging.getLogger('sqlalchemy.orm')) - return loggers - -def set_logging_process(options, queue): - qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue) - qlogger.propagate = 0 - return qlogger - -def get_auth(options, access_token): - if options.username and options.password: - auth = tweepy.auth.BasicAuthHandler(options.username, options.password) - else: - consumer_key = models.CONSUMER_KEY - consumer_secret = models.CONSUMER_SECRET - auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) - auth.set_access_token(*access_token) - return auth - - -def add_process_event(type, args, session_maker): - session = session_maker() - try: - evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type) - session.add(evt) - session.commit() - finally: - session.close() - - -class BaseProcess(Process): - - def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): - self.parent_pid = parent_pid - self.session_maker = session_maker - self.queue = queue - self.options = options - self.logger_queue = logger_queue - self.stop_event = stop_event - self.access_token = access_token - - super(BaseProcess, self).__init__() - - # - # from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids - # - def parent_is_alive(self): - try: - # try to call Parent - os.kill(self.parent_pid, 0) - except OSError: - # *beeep* oh no! The phone's disconnected! - return False - else: - # *ring* Hi mom! - return True - - - def __get_process_event_args(self): - return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token} - - def run(self): - try: - add_process_event("start_worker", self.__get_process_event_args(), self.session_maker) - self.do_run() - finally: - add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker) - - def do_run(self): - raise NotImplementedError() - - - -class SourceProcess(BaseProcess): - - def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): - self.track = options.track - self.token_filename = options.token_filename - self.catchup = options.catchup - self.timeout = options.timeout - super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) - - def do_run(self): - - #import pydevd - #pydevd.settrace(suspend=True) - - self.logger = set_logging_process(self.options, self.logger_queue) - self.auth = get_auth(self.options, self.access_token) - - self.logger.debug("SourceProcess : run ") - track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() - self.logger.debug("SourceProcess : track list " + track_list) - - track_list = [k.strip() for k in track_list.split(',')] - - self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) - stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout) - self.logger.debug("SourceProcess : after connecting to stream") - stream.muststop = lambda: self.stop_event.is_set() - - session = self.session_maker() - - try: - for tweet in stream: - if not self.parent_is_alive(): - sys.exit() - self.logger.debug("SourceProcess : tweet " + repr(tweet)) - source = TweetSource(original_json=tweet) - self.logger.debug("SourceProcess : source created") - add_retries = 0 - while add_retries < 10: - try: - add_retries += 1 - session.add(source) - session.flush() - break - except OperationalError as e: - session.rollback() - self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries)) - if add_retries == 10: - raise e - - source_id = source.id - self.logger.debug("SourceProcess : before queue + source id " + repr(source_id)) - self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) - session.commit() - self.queue.put((source_id, tweet), False) - - except Exception as e: - self.logger.error("SourceProcess : Error when processing tweet " + repr(e)) - finally: - session.rollback() - stream.close() - session.close() - self.queue.close() - self.stop_event.set() - - -def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger): - try: - tweet_obj = anyjson.deserialize(tweet) - if 'text' not in tweet_obj: - tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) - session.add(tweet_log) - return - screen_name = "" - if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: - screen_name = tweet_obj['user']['screen_name'] - logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) - logger.debug(u"Process_tweet :" + repr(tweet)) - processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user) - processor.process() - except Exception as e: - message = u"Error %s processing tweet %s" % (repr(e), tweet) - logger.exception(message) - output = StringIO.StringIO() - try: - traceback.print_exc(file=output) - error_stack = output.getvalue() - finally: - output.close() - session.rollback() - tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack) - session.add(tweet_log) - session.commit() - - - -class TweetProcess(BaseProcess): - - def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): - super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) - self.twitter_query_user = options.twitter_query_user - - - def do_run(self): - - self.logger = set_logging_process(self.options, self.logger_queue) - session = self.session_maker() - try: - while not self.stop_event.is_set() and self.parent_is_alive(): - try: - source_id, tweet_txt = self.queue.get(True, 3) - self.logger.debug("Processing source id " + repr(source_id)) - except Exception as e: - self.logger.debug('Process tweet exception in loop : ' + repr(e)) - continue - process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger) - session.commit() - finally: - session.rollback() - self.stop_event.set() - session.close() - - -def get_sessionmaker(conn_str): - engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False) - Session = scoped_session(Session) - return Session, engine, metadata - - -def process_leftovers(session, access_token, twitter_query_user, logger): - - sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) - - for src in sources: - tweet_txt = src.original_json - process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger) - session.commit() - - - - #get tweet source that do not match any message - #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull; -def process_log(logger_queues, stop_event): - while not stop_event.is_set(): - for lqueue in logger_queues: - try: - record = lqueue.get_nowait() - logging.getLogger(record.name).handle(record) - except Queue.Empty: - continue - except IOError: - continue - time.sleep(0.1) - - -def get_options(): - - usage = "usage: %prog [options]" - - parser = OptionParser(usage=usage) - - parser.add_option("-f", "--file", dest="conn_str", - help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db") - parser.add_option("-u", "--user", dest="username", - help="Twitter user", metavar="USER", default=None) - parser.add_option("-w", "--password", dest="password", - help="Twitter password", metavar="PASSWORD", default=None) - parser.add_option("-T", "--track", dest="track", - help="Twitter track", metavar="TRACK") - parser.add_option("-n", "--new", dest="new", action="store_true", - help="new database", default=False) - parser.add_option("-D", "--daemon", dest="daemon", action="store_true", - help="launch daemon", default=False) - parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", - help="Token file name") - parser.add_option("-d", "--duration", dest="duration", - help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') - parser.add_option("-N", "--nb-process", dest="process_nb", - help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') - parser.add_option("--url", dest="url", - help="The twitter url to connect to.", metavar="URL", default=tweetstream.FilterStream.url) - parser.add_option("--query-user", dest="twitter_query_user", action="store_true", - help="Query twitter for users", default=False, metavar="QUERY_USER") - parser.add_option("--catchup", dest="catchup", - help="catchup count for tweets", default=None, metavar="CATCHUP", type='int') - parser.add_option("--timeout", dest="timeout", - help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int') - - - - - utils.set_logging_options(parser) - - return parser.parse_args() - - -def do_run(options, session_maker): - - stop_args = {} - - access_token = None - if not options.username or not options.password: - access_token = utils.get_oauth_token(options.token_filename) - - session = session_maker() - try: - process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger()) - session.commit() - finally: - session.rollback() - session.close() - - if options.process_nb <= 0: - utils.get_logger().debug("Leftovers processed. Exiting.") - return None - - queue = mQueue() - stop_event = Event() - - #workaround for bug on using urllib2 and multiprocessing - req = urllib2.Request('http://localhost') - conn = None - try: - conn = urllib2.urlopen(req) - except: - utils.get_logger().debug("could not open localhost") - #donothing - finally: - if conn is not None: - conn.close() - - process_engines = [] - logger_queues = [] - - SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) - process_engines.append(engine_process) - lqueue = mQueue(1) - logger_queues.append(lqueue) - pid = os.getpid() - sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) - - tweet_processes = [] - - for i in range(options.process_nb - 1): - SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) - process_engines.append(engine_process) - lqueue = mQueue(1) - logger_queues.append(lqueue) - cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid) - tweet_processes.append(cprocess) - - def interupt_handler(signum, frame): - utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9))) - stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)}) - stop_event.set() - - signal.signal(signal.SIGINT , interupt_handler) - signal.signal(signal.SIGHUP , interupt_handler) - signal.signal(signal.SIGALRM, interupt_handler) - signal.signal(signal.SIGTERM, interupt_handler) - - log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,)) - log_thread.daemon = True - - log_thread.start() - - sprocess.start() - for cprocess in tweet_processes: - cprocess.start() - - add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker) - - if options.duration >= 0: - end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) - - - while not stop_event.is_set(): - if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: - stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts}) - stop_event.set() - break - if sprocess.is_alive(): - time.sleep(1) - else: - stop_args.update({'message': 'Source process killed'}) - stop_event.set() - break - utils.get_logger().debug("Joining Source Process") - try: - sprocess.join(10) - except: - utils.get_logger().debug("Pb joining Source Process - terminating") - sprocess.terminate() - - for i, cprocess in enumerate(tweet_processes): - utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) - try: - cprocess.join(3) - except: - utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) - cprocess.terminate() - - - utils.get_logger().debug("Close queues") - try: - queue.close() - for lqueue in logger_queues: - lqueue.close() - except exception as e: - utils.get_logger().error("error when closing queues %s", repr(e)) - #do nothing - - - if options.process_nb > 1: - utils.get_logger().debug("Processing leftovers") - session = session_maker() - try: - process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger()) - session.commit() - finally: - session.rollback() - session.close() - - for pengine in process_engines: - pengine.dispose() - - return stop_args - - -def main(options, args): - - global conn_str - - conn_str = options.conn_str.strip() - if not re.match("^\w+://.+", conn_str): - conn_str = 'sqlite:///' + options.conn_str - - if conn_str.startswith("sqlite") and options.new: - filepath = conn_str[conn_str.find(":///") + 4:] - if os.path.exists(filepath): - i = 1 - basename, extension = os.path.splitext(filepath) - new_path = '%s.%d%s' % (basename, i, extension) - while i < 1000000 and os.path.exists(new_path): - i += 1 - new_path = '%s.%d%s' % (basename, i, extension) - if i >= 1000000: - raise Exception("Unable to find new filename for " + filepath) - else: - shutil.move(filepath, new_path) - - Session, engine, metadata = get_sessionmaker(conn_str) - - if options.new: - check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True) - if len(check_metadata.sorted_tables) > 0: - message = "Database %s not empty exiting" % conn_str - utils.get_logger().error(message) - sys.exit(message) - - metadata.create_all(engine) - session = Session() - try: - models.add_model_version(session) - finally: - session.close() - - stop_args = {} - try: - add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session) - stop_args = do_run(options, Session) - except Exception as e: - utils.get_logger().exception("Error in main thread") - outfile = StringIO.StringIO() - try: - traceback.print_exc(file=outfile) - stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()} - finally: - outfile.close() - raise - finally: - add_process_event(type="shutdown", args=stop_args, session_maker=Session) - - utils.get_logger().debug("Done. Exiting. " + repr(stop_args)) - - - -if __name__ == '__main__': - - (options, args) = get_options() - - loggers = set_logging(options) - - utils.get_logger().debug("OPTIONS : " + repr(options)) - - if options.daemon: - import daemon - import lockfile - - hdlr_preserve = [] - for logger in loggers: - hdlr_preserve.extend([h.stream for h in logger.handlers]) - - context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) - with context: - main(options, args) - else: - main(options, args) -