--- a/script/stream/recorder_tweetstream.py Fri Aug 12 18:17:27 2011 +0200
+++ b/script/stream/recorder_tweetstream.py Wed Aug 17 18:32:07 2011 +0200
@@ -1,10 +1,12 @@
from getpass import getpass
from iri_tweet import models, utils
from iri_tweet.models import TweetSource, TweetLog
-from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
+from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event,
+ get_logger)
from optparse import OptionParser
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import scoped_session, sessionmaker
+import Queue
import StringIO
import anyjson
import datetime
@@ -16,6 +18,7 @@
import socket
import sqlalchemy.schema
import sys
+import threading
import time
import traceback
import tweepy.auth
@@ -34,7 +37,7 @@
def set_logging(options):
- utils.set_logging(options, logging.getLogger('iri_tweet'))
+ utils.set_logging(options, logging.getLogger('iri.tweet'))
utils.set_logging(options, logging.getLogger('multiprocessing'))
if options.debug >= 2:
utils.set_logging(options, logging.getLogger('sqlalchemy.engine'))
@@ -42,6 +45,11 @@
#utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
#utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
+def set_logging_process(options, queue):
+ qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
+ qlogger.propagate = 0
+ return qlogger
+
def get_auth(options, access_token):
if options.username and options.password:
auth = tweepy.auth.BasicAuthHandler(options.username, options.password)
@@ -107,7 +115,7 @@
class SourceProcess(Process):
- def __init__(self, session_maker, queue, options, access_token, stop_event):
+ def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
self.session_maker = session_maker
self.queue = queue
self.track = options.track
@@ -116,31 +124,33 @@
self.stop_event = stop_event
self.options = options
self.access_token = access_token
+ self.logger_queue = logger_queue
super(SourceProcess, self).__init__()
def run(self):
+
#import pydevd
#pydevd.settrace(suspend=False)
- set_logging(self.options)
+ self.logger = set_logging_process(self.options, self.logger_queue)
self.auth = get_auth(self.options, self.access_token)
- utils.get_logger().debug("SourceProcess : run")
+ self.logger.debug("SourceProcess : run")
track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
track_list = [k for k in track_list.split(',')]
- utils.get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))
+ self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))
stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
- utils.get_logger().debug("SourceProcess : after connecting to stream")
+ self.logger.debug("SourceProcess : after connecting to stream")
stream.muststop = lambda: self.stop_event.is_set()
session = self.session_maker()
try:
for tweet in stream:
- utils.get_logger().debug("tweet " + repr(tweet))
+ self.logger.debug("tweet " + repr(tweet))
source = TweetSource(original_json=tweet)
- utils.get_logger().debug("source created")
+ self.logger.debug("source created")
add_retries = 0
while add_retries < 10:
try:
@@ -150,18 +160,18 @@
break
except OperationalError as e:
session.rollback()
- utils.get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
+ self.logger.debug("Operational Error %s nb %d" % (repr(e), add_retries))
if add_retries == 10:
raise e
source_id = source.id
- utils.get_logger().debug("before queue + source id " + repr(source_id))
- utils.get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+ self.logger.debug("before queue + source id " + repr(source_id))
+ self.logger.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
session.commit()
self.queue.put((source_id, tweet), False)
except Exception as e:
- utils.get_logger().error("Error when processing tweet " + repr(e))
+ self.logger.error("Error when processing tweet " + repr(e))
finally:
session.rollback()
stream.close()
@@ -170,19 +180,23 @@
self.stop_event.set()
-def process_tweet(tweet, source_id, session, access_token):
+def process_tweet(tweet, source_id, session, access_token, logger):
try:
tweet_obj = anyjson.deserialize(tweet)
+ if 'text' not in tweet_obj:
+ tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
+ session.add(tweet_log)
+ return
screen_name = ""
if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
screen_name = tweet_obj['user']['screen_name']
- utils.get_logger().debug(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
- utils.get_logger().debug(u"Process_tweet :" + repr(tweet))
+ logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
+ logger.debug(u"Process_tweet :" + repr(tweet))
processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None)
processor.process()
except Exception as e:
message = u"Error %s processing tweet %s" % (repr(e), tweet)
- utils.get_logger().error(message)
+ logger.error(message)
output = StringIO.StringIO()
traceback.print_exception(Exception, e, None, None, output)
error_stack = output.getvalue()
@@ -196,28 +210,29 @@
class TweetProcess(Process):
- def __init__(self, session_maker, queue, options, access_token, stop_event):
+ def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
self.session_maker = session_maker
self.queue = queue
self.stop_event = stop_event
self.options = options
self.access_token = access_token
+ self.logger_queue = logger_queue
super(TweetProcess, self).__init__()
def run(self):
- set_logging(self.options)
+ self.logger = set_logging_process(self.options, self.logger_queue)
session = self.session_maker()
try:
while not self.stop_event.is_set():
try:
source_id, tweet_txt = queue.get(True, 3)
- utils.get_logger().debug("Processing source id " + repr(source_id))
+ self.logger.debug("Processing source id " + repr(source_id))
except Exception as e:
- utils.get_logger().debug('Process tweet exception in loop : ' + repr(e))
+ self.logger.debug('Process tweet exception in loop : ' + repr(e))
continue
- process_tweet(tweet_txt, source_id, session, self.access_token)
+ process_tweet(tweet_txt, source_id, session, self.access_token, self.logger)
session.commit()
finally:
session.rollback()
@@ -231,19 +246,28 @@
return Session, engine, metadata
-def process_leftovers(session, access_token):
+def process_leftovers(session, access_token, logger):
sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
for src in sources:
tweet_txt = src.original_json
- process_tweet(tweet_txt, src.id, session, access_token)
+ process_tweet(tweet_txt, src.id, session, access_token, logger)
session.commit()
#get tweet source that do not match any message
#select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
+def process_log(logger_queues, stop_event):
+ while not stop_event.is_set():
+ for lqueue in logger_queues:
+ try:
+ record = lqueue.get_nowait()
+ logging.getLogger(record.name).handle(record)
+ except Queue.Empty:
+ continue
+ time.sleep(0.1)
def get_options():
@@ -275,7 +299,7 @@
if __name__ == '__main__':
-
+
(options, args) = get_options()
set_logging(options)
@@ -287,10 +311,10 @@
conn_str = options.conn_str.strip()
if not re.match("^\w+://.+", conn_str):
- conn_str = 'sqlite://' + options.conn_str
+ conn_str = 'sqlite:///' + options.conn_str
if conn_str.startswith("sqlite") and options.new:
- filepath = conn_str[conn_str.find("://"):]
+ filepath = conn_str[conn_str.find(":///")+4:]
if os.path.exists(filepath):
i = 1
basename, extension = os.path.splitext(filepath)
@@ -320,7 +344,7 @@
session = Session()
try:
- process_leftovers(session, access_token)
+ process_leftovers(session, access_token, utils.get_logger())
session.commit()
finally:
session.rollback()
@@ -330,7 +354,7 @@
utils.get_logger().debug("Leftovers processed. Exiting.")
sys.exit()
- queue = JoinableQueue()
+ queue = mQueue()
stop_event = Event()
#workaround for bug on using urllib2 and multiprocessing
@@ -344,15 +368,24 @@
finally:
if conn is not None:
conn.close()
-
+
+ process_engines = []
+ logger_queues = []
- sprocess = SourceProcess(Session, queue, options, access_token, stop_event)
+ SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+ process_engines.append(engine_process)
+ lqueue = mQueue(1)
+ logger_queues.append(lqueue)
+ sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)
tweet_processes = []
for i in range(options.process_nb - 1):
- Session, engine, metadata = get_sessionmaker(conn_str)
- cprocess = TweetProcess(Session, queue, options, access_token, stop_event)
+ SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+ process_engines.append(engine_process)
+ lqueue = mQueue(1)
+ logger_queues.append(lqueue)
+ cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)
tweet_processes.append(cprocess)
def interupt_handler(signum, frame):
@@ -360,23 +393,28 @@
signal.signal(signal.SIGINT, interupt_handler)
+ log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues,stop_event,))
+ log_thread.daemon = True
+
sprocess.start()
for cprocess in tweet_processes:
cprocess.start()
+ log_thread.start()
+
if options.duration >= 0:
- end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
+ end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
+
while not stop_event.is_set():
if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts:
stop_event.set()
break
- if sprocess.is_alive():
+ if sprocess.is_alive():
time.sleep(1)
else:
stop_event.set()
break
-
utils.get_logger().debug("Joining Source Process")
try:
sprocess.join(10)
@@ -384,8 +422,6 @@
utils.get_logger().debug("Pb joining Source Process - terminating")
sprocess.terminate()
- utils.get_logger().debug("Joining Queue")
- #queue.join()
for i, cprocess in enumerate(tweet_processes):
utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
try:
@@ -393,15 +429,30 @@
except:
utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
cprocess.terminate()
+
- utils.get_logger().debug("Processing leftovers")
- session = Session()
+ utils.get_logger().debug("Close queues")
try:
- process_leftovers(session, access_token)
- session.commit()
- finally:
- session.rollback()
- session.close()
+ queue.close()
+ for lqueue in logger_queues:
+ lqueue.close()
+ except exception as e:
+ utils.get_logger().error("error when closing queues %s", repr(e))
+ #do nothing
+
+
+ if options.process_nb > 1:
+ utils.get_logger().debug("Processing leftovers")
+ session = Session()
+ try:
+ process_leftovers(session, access_token, utils.get_logger())
+ session.commit()
+ finally:
+ session.rollback()
+ session.close()
+ for pengine in process_engines:
+ pengine.dispose()
+
utils.get_logger().debug("Done. Exiting.")