--- a/script/stream/recorder_tweetstream.py Tue Jul 26 23:57:09 2011 +0200
+++ b/script/stream/recorder_tweetstream.py Wed Jul 27 00:04:55 2011 +0200
@@ -1,16 +1,23 @@
from getpass import getpass
from iri_tweet import models, utils
+from iri_tweet.models import TweetSource, TweetLog
+from multiprocessing import Queue, JoinableQueue, Process, Event
from optparse import OptionParser
from sqlalchemy.orm import sessionmaker
from sqlite3 import *
+import StringIO
+import anyjson
import datetime
import logging
import os
+import shutil
+import signal
import socket
import sys
import time
+import traceback
+import tweepy.auth
import tweetstream
-import tweepy.auth
socket._fileobject.default_bufsize = 0
@@ -44,12 +51,12 @@
"""
- def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, **kwargs):
+ def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs):
self.max_reconnects = reconnects
self.retry_wait = retry_wait
self._reconnects = 0
self._error_cb = error_cb
- super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, **kwargs)
+ super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs)
def next(self):
while True:
@@ -72,45 +79,134 @@
-def process_tweet(tweet, session, debug, token_filename):
- screen_name = ""
- if 'user' in tweet and 'screen_name' in tweet['user']:
- screen_name = tweet['user']['screen_name']
- logging.info("Process_tweet from %s : %s" % (screen_name, tweet['text']))
- logging.debug("Process_tweet :" + repr(tweet))
- processor = utils.TwitterProcessor(tweet, None, session, token_filename)
- processor.process()
+class SourceProcess(Process):
+
+ def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
+ super(SourceProcess, self).__init__()
+ self.session_maker = session_maker
+ self.queue = queue
+ self.auth = auth
+ self.track = track
+ self.debug = debug
+ self.reconnects = reconnects
+ self.token_filename = token_filename
+ self.stop_event = stop_event
+# self.stop_event =
+
+ def run(self):
+
+ track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
+ track_list = [k for k in track_list.split(',')]
+
+ stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
+ stream.muststop = lambda: self.stop_event.is_set()
+
+ session = self.session_maker()
+
+ try:
+ for tweet in stream:
+ source = TweetSource(original_json=tweet)
+ session.add(source)
+ session.flush()
+ source_id = source.id
+ queue.put((source_id, tweet), False)
+ #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
+ logging.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+ session.commit()
+# if duration >= 0 and datetime.datetime.utcnow() >= end_ts:
+# print "Stop recording after %d seconds." % (duration)
+# break
+ except:
+ session.rollback()
+ finally:
+ stream.close()
+ session.close()
+
+
+class TweetProcess(Process):
+
+ def __init__(self, session_maker, queue, debug, token_filename, stop_event):
+ super(TweetProcess, self).__init__()
+ self.session_maker = session_maker
+ self.queue = queue
+ self.debug = debug
+ self.stop_event = stop_event
+ self.token_filename = token_filename
-def main(username, password, track, session, debug, reconnects, token_filename, duration):
+ def run(self):
+
+ session = self.session_maker()
+ try:
+ while not self.stop_event.is_set():
+ try:
+ source_id, tweet_txt = queue.get(True, 30)
+ except:
+ continue
+ process_tweet(tweet_txt, source_id, session)
+ session.commit()
+ self.queue.task_done()
+ except:
+ session.rollback()
+ raise
+ finally:
+ session.close()
+
+
+ def process_tweet(tweet, source_id, session):
+
+ try:
+ tweet_obj = anyjson.deserialize(tweet)
+ screen_name = ""
+ if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
+ screen_name = tweet_obj['user']['screen_name']
+ logging.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
+ logging.debug(u"Process_tweet :" + repr(tweet))
+ processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, self.token_filename)
+ processor.process()
+ except Exception, e:
+ message = u"Error %e processing tweet %s" % (unicode(e), tweet)
+ logging.error(message)
+ output = StringIO.StringIO()
+ traceback.print_exception(Exception, e, None, None, output)
+ tweet_log = TweetLog(status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=output.getvalue())
+ output.close()
+
+
+
+#def main_source(username, password, track, session, debug, reconnects, token_filename, duration):
#username = username or raw_input('Twitter username: ')
#password = password or getpass('Twitter password: ')
- track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
- track_list = [k for k in track_list.split(',')]
+# track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
+# track_list = [k for k in track_list.split(',')]
- if username and password:
- auth = tweepy.auth.BasicAuthHandler(username, password)
- else:
- consumer_key = models.CONSUMER_KEY
- consumer_secret = models.CONSUMER_SECRET
- auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
- auth.set_access_token(*(utils.get_oauth_token(token_filename)))
+# if username and password:
+# auth = tweepy.auth.BasicAuthHandler(username, password)
+# else:
+# consumer_key = models.CONSUMER_KEY
+# consumer_secret = models.CONSUMER_SECRET
+# auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
+# auth.set_access_token(*(utils.get_oauth_token(token_filename)))
+
+# if duration >= 0:
+# end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration)
- if duration >= 0:
- end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration)
-
- stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects)
- try:
- for tweet in stream:
- if duration >= 0 and datetime.datetime.utcnow() >= end_ts:
- print "Stop recording after %d seconds." % (duration)
- break
- process_tweet(tweet, session, debug, token_filename)
- logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime)))
- session.commit()
- finally:
- stream.close()
+# stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects, as_text=True)
+# try:
+# for tweet in stream:
+# source = TweetSource(original_json=tweet)
+# session.add(source)
+# session.flush()
+# source_id = source.id
+# process_tweet(tweet, source_id, session, debug, token_filename)
+# logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime)))
+# session.commit()
+# if duration >= 0 and datetime.datetime.utcnow() >= end_ts:
+# print "Stop recording after %d seconds." % (duration)
+# break
+# finally:
+# stream.close()
def get_options():
parser = OptionParser()
@@ -130,6 +226,9 @@
help="Token file name")
parser.add_option("-d", "--duration", dest="duration",
help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
+ parser.add_option("-N", "--consumer", dest="consumer_nb",
+ help="number of consumer", metavar="CONSUMER", default=1, type='int')
+
utils.set_logging_options(parser)
@@ -139,7 +238,6 @@
if __name__ == '__main__':
-
(options, args) = get_options()
utils.set_logging(options)
@@ -149,17 +247,62 @@
print repr(options)
if options.new and os.path.exists(options.filename):
- os.remove(options.filename)
+ i = 1
+ basename, extension = os.path.splitext(options.filename)
+ new_path = '$s.%d.%s' % (basename, i, extension)
+ while i < 1000000 and os.path.exists(new_path):
+ i += 1
+ new_path = '$s.%d.%s' % (basename, i, extension)
+ if i >= 1000000:
+ raise Exception("Unable to find new filename for " + options.filename)
+ else:
+ shutil.move(options.filename, new_path)
+
- engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug>=2))
+ engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
Session = sessionmaker(bind=engine)
- session = Session()
+ queue = JoinableQueue()
+ stop_event = Event()
+
+ if options.username and options.password:
+ auth = tweepy.auth.BasicAuthHandler(options.username, options.password)
+ else:
+ consumer_key = models.CONSUMER_KEY
+ consumer_secret = models.CONSUMER_SECRET
+ auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
+ auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))
- try:
- try:
- main(options.username, options.password, options.track, session, options.debug, options.reconnects, options.token_filename, options.duration)
- except KeyboardInterrupt:
- print '\nGoodbye!'
- session.commit()
- finally:
- session.close()
+
+ sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)
+
+ tweet_processes = []
+
+ for i in range(options.consumer_nb):
+ cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
+ tweet_processes.append(cprocess)
+
+ def interupt_handler(signum, frame):
+ stop_event.set()
+
+ signal.signal(signal.SIGINT, interupt_handler)
+
+ sprocess.start()
+ for cprocess in tweet_processes:
+ cprocess.start()
+
+ if options.duration >= 0:
+ end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
+
+ while not stop_event.is_set():
+ if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts:
+ stop_event.set()
+ break
+ if sprocess.is_alive():
+ time.sleep(0.1)
+ else:
+ break
+
+ sprocess.join()
+ queue.join()
+ for cprocess in tweet_processes:
+ cprocess.join()