--- a/script/stream/recorder_tweetstream.py Wed Jul 27 00:04:55 2011 +0200
+++ b/script/stream/recorder_tweetstream.py Mon Aug 08 09:01:40 2011 +0200
@@ -1,14 +1,13 @@
from getpass import getpass
from iri_tweet import models, utils
from iri_tweet.models import TweetSource, TweetLog
-from multiprocessing import Queue, JoinableQueue, Process, Event
+from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
from optparse import OptionParser
from sqlalchemy.orm import sessionmaker
-from sqlite3 import *
import StringIO
+import logging
import anyjson
import datetime
-import logging
import os
import shutil
import signal
@@ -18,6 +17,8 @@
import traceback
import tweepy.auth
import tweetstream
+from iri_tweet.utils import logger
+from sqlalchemy.exc import OperationalError
socket._fileobject.default_bufsize = 0
@@ -82,7 +83,6 @@
class SourceProcess(Process):
def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
- super(SourceProcess, self).__init__()
self.session_maker = session_maker
self.queue = queue
self.auth = auth
@@ -91,47 +91,93 @@
self.reconnects = reconnects
self.token_filename = token_filename
self.stop_event = stop_event
+ super(SourceProcess, self).__init__()
# self.stop_event =
def run(self):
+ get_logger().debug("SourceProcess : run")
track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
track_list = [k for k in track_list.split(',')]
-
+
+ get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))
stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
+ get_logger().debug("SourceProcess : after connecting to stream")
stream.muststop = lambda: self.stop_event.is_set()
session = self.session_maker()
try:
for tweet in stream:
+ get_logger().debug("tweet " + repr(tweet))
source = TweetSource(original_json=tweet)
- session.add(source)
- session.flush()
+ get_logger().debug("source created")
+ add_retries = 0
+ while add_retries < 10:
+ try:
+ add_retries += 1
+ session.add(source)
+ session.flush()
+ break
+ except OperationalError as e:
+ session.rollback()
+ get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
+ if add_retries==10:
+ raise e
+
source_id = source.id
- queue.put((source_id, tweet), False)
+ get_logger().debug("before queue + source id " + repr(source_id))
+ self.queue.put((source_id, tweet), False)
#process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
- logging.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+ get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
session.commit()
# if duration >= 0 and datetime.datetime.utcnow() >= end_ts:
# print "Stop recording after %d seconds." % (duration)
# break
- except:
+ except Exception as e:
+ get_logger().error("Error when processing tweet " + repr(e))
+ finally:
session.rollback()
- finally:
stream.close()
session.close()
-
+ self.queue.close()
+ self.stop_event.set()
+
+
+def process_tweet(tweet, source_id, session, token_filename):
+ try:
+ tweet_obj = anyjson.deserialize(tweet)
+ screen_name = ""
+ if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
+ screen_name = tweet_obj['user']['screen_name']
+ get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
+ get_logger().debug(u"Process_tweet :" + repr(tweet))
+ processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename)
+ processor.process()
+ except Exception as e:
+ message = u"Error %s processing tweet %s" % (repr(e), tweet)
+ get_logger().error(message)
+ output = StringIO.StringIO()
+ traceback.print_exception(Exception, e, None, None, output)
+ error_stack = output.getvalue()
+ output.close()
+ session.rollback()
+ tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
+ session.add(tweet_log)
+ session.commit()
+
+
class TweetProcess(Process):
def __init__(self, session_maker, queue, debug, token_filename, stop_event):
- super(TweetProcess, self).__init__()
self.session_maker = session_maker
self.queue = queue
self.debug = debug
self.stop_event = stop_event
self.token_filename = token_filename
+ super(TweetProcess, self).__init__()
+
def run(self):
@@ -139,74 +185,33 @@
try:
while not self.stop_event.is_set():
try:
- source_id, tweet_txt = queue.get(True, 30)
- except:
+ source_id, tweet_txt = queue.get(True, 10)
+ get_logger().debug("Processing source id " + repr(source_id))
+ except Exception as e:
+ get_logger().debug('Process tweet exception in loop : ' + repr(e))
continue
- process_tweet(tweet_txt, source_id, session)
+ process_tweet(tweet_txt, source_id, session, self.token_filename)
session.commit()
- self.queue.task_done()
except:
- session.rollback()
raise
finally:
+ session.rollback()
+ self.stop_event.set()
session.close()
-
- def process_tweet(tweet, source_id, session):
-
- try:
- tweet_obj = anyjson.deserialize(tweet)
- screen_name = ""
- if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
- screen_name = tweet_obj['user']['screen_name']
- logging.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
- logging.debug(u"Process_tweet :" + repr(tweet))
- processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, self.token_filename)
- processor.process()
- except Exception, e:
- message = u"Error %e processing tweet %s" % (unicode(e), tweet)
- logging.error(message)
- output = StringIO.StringIO()
- traceback.print_exception(Exception, e, None, None, output)
- tweet_log = TweetLog(status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=output.getvalue())
- output.close()
-
-
-
-#def main_source(username, password, track, session, debug, reconnects, token_filename, duration):
-
- #username = username or raw_input('Twitter username: ')
- #password = password or getpass('Twitter password: ')
+def process_leftovers(session, token_filename):
+
+ sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
+
+ for src in sources:
+ tweet_txt = src.original_json
+ process_tweet(tweet_txt, src.id, session, token_filename)
-# track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
-# track_list = [k for k in track_list.split(',')]
-
-# if username and password:
-# auth = tweepy.auth.BasicAuthHandler(username, password)
-# else:
-# consumer_key = models.CONSUMER_KEY
-# consumer_secret = models.CONSUMER_SECRET
-# auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
-# auth.set_access_token(*(utils.get_oauth_token(token_filename)))
-
-# if duration >= 0:
-# end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration)
+
-# stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects, as_text=True)
-# try:
-# for tweet in stream:
-# source = TweetSource(original_json=tweet)
-# session.add(source)
-# session.flush()
-# source_id = source.id
-# process_tweet(tweet, source_id, session, debug, token_filename)
-# logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime)))
-# session.commit()
-# if duration >= 0 and datetime.datetime.utcnow() >= end_ts:
-# print "Stop recording after %d seconds." % (duration)
-# break
-# finally:
-# stream.close()
+ #get tweet source that do not match any message
+ #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
+
def get_options():
parser = OptionParser()
@@ -240,7 +245,7 @@
(options, args) = get_options()
- utils.set_logging(options)
+ utils.set_logging(options, get_logger())
if options.debug:
print "OPTIONS : "
@@ -249,18 +254,16 @@
if options.new and os.path.exists(options.filename):
i = 1
basename, extension = os.path.splitext(options.filename)
- new_path = '$s.%d.%s' % (basename, i, extension)
+ new_path = '%s.%d%s' % (basename, i, extension)
while i < 1000000 and os.path.exists(new_path):
i += 1
- new_path = '$s.%d.%s' % (basename, i, extension)
+ new_path = '%s.%d%s' % (basename, i, extension)
if i >= 1000000:
raise Exception("Unable to find new filename for " + options.filename)
else:
shutil.move(options.filename, new_path)
- engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
- Session = sessionmaker(bind=engine)
queue = JoinableQueue()
stop_event = Event()
@@ -272,12 +275,22 @@
auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))
-
+
+ engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
+ Session = sessionmaker(bind=engine)
+
+ session = Session()
+ process_leftovers(session, options.token_filename)
+ session.commit()
+ session.close()
+
sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)
tweet_processes = []
for i in range(options.consumer_nb):
+ engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
+ Session = sessionmaker(bind=engine)
cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
tweet_processes.append(cprocess)
@@ -302,7 +315,19 @@
else:
break
+ get_logger().debug("Joining Source Process")
sprocess.join()
- queue.join()
- for cprocess in tweet_processes:
+ get_logger().debug("Joining Queue")
+ #queue.join()
+ for i,cprocess in enumerate(tweet_processes):
+ get_logger().debug("Joining consumer process Nb %d" % (i+1))
cprocess.join()
+
+ get_logger().debug("Processing leftovers")
+ session = Session()
+ process_leftovers(session, options.token_filename)
+ session.commit()
+ session.close()
+
+ get_logger().debug("Done. Exiting.")
+
\ No newline at end of file