diff -r e9335ee3cf71 -r 2209e66bb50b script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Tue Aug 09 13:07:23 2011 +0200 +++ b/script/stream/recorder_tweetstream.py Fri Aug 12 18:17:27 2011 +0200 @@ -3,22 +3,25 @@ from iri_tweet.models import TweetSource, TweetLog from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger from optparse import OptionParser -from sqlalchemy.orm import sessionmaker +from sqlalchemy.exc import OperationalError +from sqlalchemy.orm import scoped_session, sessionmaker import StringIO -import logging import anyjson import datetime +import logging import os +import re import shutil import signal import socket +import sqlalchemy.schema import sys import time import traceback import tweepy.auth import tweetstream -from iri_tweet.utils import logger -from sqlalchemy.exc import OperationalError +import urllib2 +#from iri_tweet.utils import get_logger socket._fileobject.default_bufsize = 0 @@ -30,6 +33,26 @@ #just put it in a sqlite3 tqble +def set_logging(options): + utils.set_logging(options, logging.getLogger('iri_tweet')) + utils.set_logging(options, logging.getLogger('multiprocessing')) + if options.debug >= 2: + utils.set_logging(options, logging.getLogger('sqlalchemy.engine')) + #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects')) + #utils.set_logging(options, logging.getLogger('sqlalchemy.pool')) + #utils.set_logging(options, logging.getLogger('sqlalchemy.orm')) + +def get_auth(options, access_token): + if options.username and options.password: + auth = tweepy.auth.BasicAuthHandler(options.username, options.password) + else: + consumer_key = models.CONSUMER_KEY + consumer_secret = models.CONSUMER_SECRET + auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) + auth.set_access_token(*access_token) + return auth + + class ReconnectingTweetStream(tweetstream.FilterStream): """TweetStream class that automatically tries to reconnect if the connecting goes down. Reconnecting, and waiting for reconnecting, is @@ -62,9 +85,10 @@ def next(self): while True: try: + utils.get_logger().debug("return super.next") return super(ReconnectingTweetStream, self).next() except tweetstream.ConnectionError, e: - logging.debug("connection error :" + str(e)) + utils.get_logger().debug("connection error :" + str(e)) self._reconnects += 1 if self._reconnects > self.max_reconnects: raise tweetstream.ConnectionError("Too many retries") @@ -80,38 +104,43 @@ + class SourceProcess(Process): - def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event): + def __init__(self, session_maker, queue, options, access_token, stop_event): self.session_maker = session_maker self.queue = queue - self.auth = auth - self.track = track - self.debug = debug - self.reconnects = reconnects - self.token_filename = token_filename + self.track = options.track + self.reconnects = options.reconnects + self.token_filename = options.token_filename self.stop_event = stop_event + self.options = options + self.access_token = access_token super(SourceProcess, self).__init__() -# self.stop_event = def run(self): + #import pydevd + #pydevd.settrace(suspend=False) + + set_logging(self.options) + self.auth = get_auth(self.options, self.access_token) - get_logger().debug("SourceProcess : run") + utils.get_logger().debug("SourceProcess : run") track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() track_list = [k for k in track_list.split(',')] - get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list)) + utils.get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list)) stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True) - get_logger().debug("SourceProcess : after connecting to stream") + utils.get_logger().debug("SourceProcess : after connecting to stream") stream.muststop = lambda: self.stop_event.is_set() session = self.session_maker() try: for tweet in stream: - get_logger().debug("tweet " + repr(tweet)) + utils.get_logger().debug("tweet " + repr(tweet)) source = TweetSource(original_json=tweet) - get_logger().debug("source created") + utils.get_logger().debug("source created") add_retries = 0 while add_retries < 10: try: @@ -121,21 +150,18 @@ break except OperationalError as e: session.rollback() - get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries)) - if add_retries==10: + utils.get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries)) + if add_retries == 10: raise e source_id = source.id - get_logger().debug("before queue + source id " + repr(source_id)) - self.queue.put((source_id, tweet), False) - #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename) - get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) + utils.get_logger().debug("before queue + source id " + repr(source_id)) + utils.get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) session.commit() -# if duration >= 0 and datetime.datetime.utcnow() >= end_ts: -# print "Stop recording after %d seconds." % (duration) -# break + self.queue.put((source_id, tweet), False) + except Exception as e: - get_logger().error("Error when processing tweet " + repr(e)) + utils.get_logger().error("Error when processing tweet " + repr(e)) finally: session.rollback() stream.close() @@ -144,19 +170,19 @@ self.stop_event.set() -def process_tweet(tweet, source_id, session, token_filename): +def process_tweet(tweet, source_id, session, access_token): try: tweet_obj = anyjson.deserialize(tweet) screen_name = "" if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: screen_name = tweet_obj['user']['screen_name'] - get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) - get_logger().debug(u"Process_tweet :" + repr(tweet)) - processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename) + utils.get_logger().debug(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) + utils.get_logger().debug(u"Process_tweet :" + repr(tweet)) + processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None) processor.process() except Exception as e: message = u"Error %s processing tweet %s" % (repr(e), tweet) - get_logger().error(message) + utils.get_logger().error(message) output = StringIO.StringIO() traceback.print_exception(Exception, e, None, None, output) error_stack = output.getvalue() @@ -170,42 +196,49 @@ class TweetProcess(Process): - def __init__(self, session_maker, queue, debug, token_filename, stop_event): + def __init__(self, session_maker, queue, options, access_token, stop_event): self.session_maker = session_maker self.queue = queue - self.debug = debug self.stop_event = stop_event - self.token_filename = token_filename + self.options = options + self.access_token = access_token super(TweetProcess, self).__init__() def run(self): + set_logging(self.options) session = self.session_maker() try: while not self.stop_event.is_set(): try: - source_id, tweet_txt = queue.get(True, 10) - get_logger().debug("Processing source id " + repr(source_id)) + source_id, tweet_txt = queue.get(True, 3) + utils.get_logger().debug("Processing source id " + repr(source_id)) except Exception as e: - get_logger().debug('Process tweet exception in loop : ' + repr(e)) + utils.get_logger().debug('Process tweet exception in loop : ' + repr(e)) continue - process_tweet(tweet_txt, source_id, session, self.token_filename) + process_tweet(tweet_txt, source_id, session, self.access_token) session.commit() - except: - raise finally: session.rollback() self.stop_event.set() session.close() + + +def get_sessionmaker(conn_str): + engine, metadata = models.setup_database(conn_str, echo=False, create_all=False) + Session = scoped_session(sessionmaker(bind=engine, autocommit=False)) + return Session, engine, metadata + -def process_leftovers(session, token_filename): +def process_leftovers(session, access_token): sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) for src in sources: tweet_txt = src.original_json - process_tweet(tweet_txt, src.id, session, token_filename) + process_tweet(tweet_txt, src.id, session, access_token) + session.commit() @@ -215,8 +248,8 @@ def get_options(): parser = OptionParser() - parser.add_option("-f", "--file", dest="filename", - help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db") + parser.add_option("-f", "--file", dest="conn_str", + help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db") parser.add_option("-u", "--user", dest="username", help="Twitter user", metavar="USER", default=None) parser.add_option("-w", "--password", dest="password", @@ -231,8 +264,8 @@ help="Token file name") parser.add_option("-d", "--duration", dest="duration", help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') - parser.add_option("-N", "--consumer", dest="consumer_nb", - help="number of consumer", metavar="CONSUMER", default=1, type='int') + parser.add_option("-N", "--nb-process", dest="process_nb", + help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') @@ -245,53 +278,81 @@ (options, args) = get_options() - utils.set_logging(options, get_logger()) + set_logging(options) if options.debug: print "OPTIONS : " print repr(options) - if options.new and os.path.exists(options.filename): - i = 1 - basename, extension = os.path.splitext(options.filename) - new_path = '%s.%d%s' % (basename, i, extension) - while i < 1000000 and os.path.exists(new_path): - i += 1 + + conn_str = options.conn_str.strip() + if not re.match("^\w+://.+", conn_str): + conn_str = 'sqlite://' + options.conn_str + + if conn_str.startswith("sqlite") and options.new: + filepath = conn_str[conn_str.find("://"):] + if os.path.exists(filepath): + i = 1 + basename, extension = os.path.splitext(filepath) new_path = '%s.%d%s' % (basename, i, extension) - if i >= 1000000: - raise Exception("Unable to find new filename for " + options.filename) - else: - shutil.move(options.filename, new_path) + while i < 1000000 and os.path.exists(new_path): + i += 1 + new_path = '%s.%d%s' % (basename, i, extension) + if i >= 1000000: + raise Exception("Unable to find new filename for " + filepath) + else: + shutil.move(filepath, new_path) + Session, engine, metadata = get_sessionmaker(conn_str) + if options.new: + check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True) + if len(check_metadata.sorted_tables) > 0: + message = "Database %s not empty exiting" % conn_str + utils.get_logger().error(message) + sys.exit(message) + + metadata.create_all(engine) + + access_token = None + if not options.username or not options.password: + access_token = utils.get_oauth_token(options.token_filename) + + session = Session() + try: + process_leftovers(session, access_token) + session.commit() + finally: + session.rollback() + session.close() + + if options.process_nb <= 0: + utils.get_logger().debug("Leftovers processed. Exiting.") + sys.exit() + queue = JoinableQueue() stop_event = Event() - - if options.username and options.password: - auth = tweepy.auth.BasicAuthHandler(options.username, options.password) - else: - consumer_key = models.CONSUMER_KEY - consumer_secret = models.CONSUMER_SECRET - auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) - auth.set_access_token(*(utils.get_oauth_token(options.token_filename))) - - - engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2)) - Session = sessionmaker(bind=engine) - session = Session() - process_leftovers(session, options.token_filename) - session.commit() - session.close() - - sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event) + #workaround for bug on using urllib2 and multiprocessing + req = urllib2.Request('http://localhost') + conn = None + try: + conn = urllib2.urlopen(req) + except: + pass + #donothing + finally: + if conn is not None: + conn.close() + + + sprocess = SourceProcess(Session, queue, options, access_token, stop_event) tweet_processes = [] - for i in range(options.consumer_nb): - engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2)) - Session = sessionmaker(bind=engine) - cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event) + for i in range(options.process_nb - 1): + Session, engine, metadata = get_sessionmaker(conn_str) + cprocess = TweetProcess(Session, queue, options, access_token, stop_event) tweet_processes.append(cprocess) def interupt_handler(signum, frame): @@ -311,23 +372,36 @@ stop_event.set() break if sprocess.is_alive(): - time.sleep(0.1) + time.sleep(1) else: + stop_event.set() break - get_logger().debug("Joining Source Process") - sprocess.join() - get_logger().debug("Joining Queue") + utils.get_logger().debug("Joining Source Process") + try: + sprocess.join(10) + except: + utils.get_logger().debug("Pb joining Source Process - terminating") + sprocess.terminate() + + utils.get_logger().debug("Joining Queue") #queue.join() - for i,cprocess in enumerate(tweet_processes): - get_logger().debug("Joining consumer process Nb %d" % (i+1)) - cprocess.join() + for i, cprocess in enumerate(tweet_processes): + utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1)) + try: + cprocess.join(3) + except: + utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1)) + cprocess.terminate() - get_logger().debug("Processing leftovers") + utils.get_logger().debug("Processing leftovers") session = Session() - process_leftovers(session, options.token_filename) - session.commit() - session.close() + try: + process_leftovers(session, access_token) + session.commit() + finally: + session.rollback() + session.close() - get_logger().debug("Done. Exiting.") - \ No newline at end of file + utils.get_logger().debug("Done. Exiting.") +