diff -r ee6305b4a7dc -r cdd7d3c0549c script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Tue Jul 26 23:57:09 2011 +0200 +++ b/script/stream/recorder_tweetstream.py Wed Jul 27 00:04:55 2011 +0200 @@ -1,16 +1,23 @@ from getpass import getpass from iri_tweet import models, utils +from iri_tweet.models import TweetSource, TweetLog +from multiprocessing import Queue, JoinableQueue, Process, Event from optparse import OptionParser from sqlalchemy.orm import sessionmaker from sqlite3 import * +import StringIO +import anyjson import datetime import logging import os +import shutil +import signal import socket import sys import time +import traceback +import tweepy.auth import tweetstream -import tweepy.auth socket._fileobject.default_bufsize = 0 @@ -44,12 +51,12 @@ """ - def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, **kwargs): + def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs): self.max_reconnects = reconnects self.retry_wait = retry_wait self._reconnects = 0 self._error_cb = error_cb - super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, **kwargs) + super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs) def next(self): while True: @@ -72,45 +79,134 @@ -def process_tweet(tweet, session, debug, token_filename): - screen_name = "" - if 'user' in tweet and 'screen_name' in tweet['user']: - screen_name = tweet['user']['screen_name'] - logging.info("Process_tweet from %s : %s" % (screen_name, tweet['text'])) - logging.debug("Process_tweet :" + repr(tweet)) - processor = utils.TwitterProcessor(tweet, None, session, token_filename) - processor.process() +class SourceProcess(Process): + + def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event): + super(SourceProcess, self).__init__() + self.session_maker = session_maker + self.queue = queue + self.auth = auth + self.track = track + self.debug = debug + self.reconnects = reconnects + self.token_filename = token_filename + self.stop_event = stop_event +# self.stop_event = + + def run(self): + + track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() + track_list = [k for k in track_list.split(',')] + + stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True) + stream.muststop = lambda: self.stop_event.is_set() + + session = self.session_maker() + + try: + for tweet in stream: + source = TweetSource(original_json=tweet) + session.add(source) + session.flush() + source_id = source.id + queue.put((source_id, tweet), False) + #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename) + logging.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime))) + session.commit() +# if duration >= 0 and datetime.datetime.utcnow() >= end_ts: +# print "Stop recording after %d seconds." % (duration) +# break + except: + session.rollback() + finally: + stream.close() + session.close() + + +class TweetProcess(Process): + + def __init__(self, session_maker, queue, debug, token_filename, stop_event): + super(TweetProcess, self).__init__() + self.session_maker = session_maker + self.queue = queue + self.debug = debug + self.stop_event = stop_event + self.token_filename = token_filename -def main(username, password, track, session, debug, reconnects, token_filename, duration): + def run(self): + + session = self.session_maker() + try: + while not self.stop_event.is_set(): + try: + source_id, tweet_txt = queue.get(True, 30) + except: + continue + process_tweet(tweet_txt, source_id, session) + session.commit() + self.queue.task_done() + except: + session.rollback() + raise + finally: + session.close() + + + def process_tweet(tweet, source_id, session): + + try: + tweet_obj = anyjson.deserialize(tweet) + screen_name = "" + if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: + screen_name = tweet_obj['user']['screen_name'] + logging.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) + logging.debug(u"Process_tweet :" + repr(tweet)) + processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, self.token_filename) + processor.process() + except Exception, e: + message = u"Error %e processing tweet %s" % (unicode(e), tweet) + logging.error(message) + output = StringIO.StringIO() + traceback.print_exception(Exception, e, None, None, output) + tweet_log = TweetLog(status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=output.getvalue()) + output.close() + + + +#def main_source(username, password, track, session, debug, reconnects, token_filename, duration): #username = username or raw_input('Twitter username: ') #password = password or getpass('Twitter password: ') - track_list = track or raw_input('Keywords to track (comma seperated): ').strip() - track_list = [k for k in track_list.split(',')] +# track_list = track or raw_input('Keywords to track (comma seperated): ').strip() +# track_list = [k for k in track_list.split(',')] - if username and password: - auth = tweepy.auth.BasicAuthHandler(username, password) - else: - consumer_key = models.CONSUMER_KEY - consumer_secret = models.CONSUMER_SECRET - auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) - auth.set_access_token(*(utils.get_oauth_token(token_filename))) +# if username and password: +# auth = tweepy.auth.BasicAuthHandler(username, password) +# else: +# consumer_key = models.CONSUMER_KEY +# consumer_secret = models.CONSUMER_SECRET +# auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) +# auth.set_access_token(*(utils.get_oauth_token(token_filename))) + +# if duration >= 0: +# end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration) - if duration >= 0: - end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration) - - stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects) - try: - for tweet in stream: - if duration >= 0 and datetime.datetime.utcnow() >= end_ts: - print "Stop recording after %d seconds." % (duration) - break - process_tweet(tweet, session, debug, token_filename) - logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime))) - session.commit() - finally: - stream.close() +# stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects, as_text=True) +# try: +# for tweet in stream: +# source = TweetSource(original_json=tweet) +# session.add(source) +# session.flush() +# source_id = source.id +# process_tweet(tweet, source_id, session, debug, token_filename) +# logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime))) +# session.commit() +# if duration >= 0 and datetime.datetime.utcnow() >= end_ts: +# print "Stop recording after %d seconds." % (duration) +# break +# finally: +# stream.close() def get_options(): parser = OptionParser() @@ -130,6 +226,9 @@ help="Token file name") parser.add_option("-d", "--duration", dest="duration", help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') + parser.add_option("-N", "--consumer", dest="consumer_nb", + help="number of consumer", metavar="CONSUMER", default=1, type='int') + utils.set_logging_options(parser) @@ -139,7 +238,6 @@ if __name__ == '__main__': - (options, args) = get_options() utils.set_logging(options) @@ -149,17 +247,62 @@ print repr(options) if options.new and os.path.exists(options.filename): - os.remove(options.filename) + i = 1 + basename, extension = os.path.splitext(options.filename) + new_path = '$s.%d.%s' % (basename, i, extension) + while i < 1000000 and os.path.exists(new_path): + i += 1 + new_path = '$s.%d.%s' % (basename, i, extension) + if i >= 1000000: + raise Exception("Unable to find new filename for " + options.filename) + else: + shutil.move(options.filename, new_path) + - engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug>=2)) + engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2)) Session = sessionmaker(bind=engine) - session = Session() + queue = JoinableQueue() + stop_event = Event() + + if options.username and options.password: + auth = tweepy.auth.BasicAuthHandler(options.username, options.password) + else: + consumer_key = models.CONSUMER_KEY + consumer_secret = models.CONSUMER_SECRET + auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) + auth.set_access_token(*(utils.get_oauth_token(options.token_filename))) - try: - try: - main(options.username, options.password, options.track, session, options.debug, options.reconnects, options.token_filename, options.duration) - except KeyboardInterrupt: - print '\nGoodbye!' - session.commit() - finally: - session.close() + + sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event) + + tweet_processes = [] + + for i in range(options.consumer_nb): + cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event) + tweet_processes.append(cprocess) + + def interupt_handler(signum, frame): + stop_event.set() + + signal.signal(signal.SIGINT, interupt_handler) + + sprocess.start() + for cprocess in tweet_processes: + cprocess.start() + + if options.duration >= 0: + end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration) + + while not stop_event.is_set(): + if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts: + stop_event.set() + break + if sprocess.is_alive(): + time.sleep(0.1) + else: + break + + sprocess.join() + queue.join() + for cprocess in tweet_processes: + cprocess.join()