script/stream/recorder_tweetstream.py
changeset 255 500cd0405c7a
parent 254 2209e66bb50b
child 256 2f335337ff64
--- a/script/stream/recorder_tweetstream.py	Fri Aug 12 18:17:27 2011 +0200
+++ b/script/stream/recorder_tweetstream.py	Wed Aug 17 18:32:07 2011 +0200
@@ -1,10 +1,12 @@
 from getpass import getpass
 from iri_tweet import models, utils
 from iri_tweet.models import TweetSource, TweetLog
-from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
+from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
+    get_logger)
 from optparse import OptionParser
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm import scoped_session, sessionmaker
+import Queue
 import StringIO
 import anyjson
 import datetime
@@ -16,6 +18,7 @@
 import socket
 import sqlalchemy.schema
 import sys
+import threading
 import time
 import traceback
 import tweepy.auth
@@ -34,7 +37,7 @@
 
 
 def set_logging(options):
-    utils.set_logging(options, logging.getLogger('iri_tweet'))
+    utils.set_logging(options, logging.getLogger('iri.tweet'))
     utils.set_logging(options, logging.getLogger('multiprocessing'))
     if options.debug >= 2:
         utils.set_logging(options, logging.getLogger('sqlalchemy.engine'))
@@ -42,6 +45,11 @@
     #utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
     #utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
 
+def set_logging_process(options, queue):
+    qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
+    qlogger.propagate = 0
+    return qlogger
+
 def get_auth(options, access_token):
     if options.username and options.password:
         auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
@@ -107,7 +115,7 @@
 
 class SourceProcess(Process):
     
-    def __init__(self, session_maker, queue, options, access_token, stop_event):
+    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
         self.session_maker = session_maker
         self.queue = queue
         self.track = options.track
@@ -116,31 +124,33 @@
         self.stop_event = stop_event
         self.options = options
         self.access_token = access_token
+        self.logger_queue = logger_queue
         super(SourceProcess, self).__init__()
     
     def run(self):
+        
         #import pydevd
         #pydevd.settrace(suspend=False)
 
-        set_logging(self.options)
+        self.logger = set_logging_process(self.options, self.logger_queue)
         self.auth = get_auth(self.options, self.access_token) 
         
-        utils.get_logger().debug("SourceProcess : run")
+        self.logger.debug("SourceProcess : run")
         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
         track_list = [k for k in track_list.split(',')]
 
-        utils.get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))                        
+        self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
         stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
-        utils.get_logger().debug("SourceProcess : after connecting to stream")
+        self.logger.debug("SourceProcess : after connecting to stream")
         stream.muststop = lambda: self.stop_event.is_set()
         
         session = self.session_maker()
         
         try:
             for tweet in stream:
-                utils.get_logger().debug("tweet " + repr(tweet))
+                self.logger.debug("tweet " + repr(tweet))
                 source = TweetSource(original_json=tweet)
-                utils.get_logger().debug("source created")
+                self.logger.debug("source created")
                 add_retries = 0
                 while add_retries < 10:
                     try:
@@ -150,18 +160,18 @@
                         break
                     except OperationalError as e:
                         session.rollback()
-                        utils.get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
+                        self.logger.debug("Operational Error %s nb %d" % (repr(e), add_retries))
                         if add_retries == 10:
                             raise e
                      
                 source_id = source.id
-                utils.get_logger().debug("before queue + source id " + repr(source_id))
-                utils.get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+                self.logger.debug("before queue + source id " + repr(source_id))
+                self.logger.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
                 session.commit()
                 self.queue.put((source_id, tweet), False)
 
         except Exception as e:
-            utils.get_logger().error("Error when processing tweet " + repr(e))
+            self.logger.error("Error when processing tweet " + repr(e))
         finally:
             session.rollback()
             stream.close()
@@ -170,19 +180,23 @@
             self.stop_event.set()
 
 
-def process_tweet(tweet, source_id, session, access_token):
+def process_tweet(tweet, source_id, session, access_token, logger):
     try:
         tweet_obj = anyjson.deserialize(tweet)
+        if 'text' not in tweet_obj:
+            tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
+            session.add(tweet_log)
+            return
         screen_name = ""
         if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
             screen_name = tweet_obj['user']['screen_name']
-        utils.get_logger().debug(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
-        utils.get_logger().debug(u"Process_tweet :" + repr(tweet))
+        logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
+        logger.debug(u"Process_tweet :" + repr(tweet))
         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None)
         processor.process()
     except Exception as e:
         message = u"Error %s processing tweet %s" % (repr(e), tweet)
-        utils.get_logger().error(message)
+        logger.error(message)
         output = StringIO.StringIO()
         traceback.print_exception(Exception, e, None, None, output)
         error_stack = output.getvalue()
@@ -196,28 +210,29 @@
         
 class TweetProcess(Process):
     
-    def __init__(self, session_maker, queue, options, access_token, stop_event):
+    def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue):
         self.session_maker = session_maker
         self.queue = queue
         self.stop_event = stop_event
         self.options = options
         self.access_token = access_token
+        self.logger_queue = logger_queue
         super(TweetProcess, self).__init__()
 
 
     def run(self):
         
-        set_logging(self.options)
+        self.logger = set_logging_process(self.options, self.logger_queue)
         session = self.session_maker()
         try:
             while not self.stop_event.is_set():
                 try:
                     source_id, tweet_txt = queue.get(True, 3)
-                    utils.get_logger().debug("Processing source id " + repr(source_id))
+                    self.logger.debug("Processing source id " + repr(source_id))
                 except Exception as e:
-                    utils.get_logger().debug('Process tweet exception in loop : ' + repr(e))
+                    self.logger.debug('Process tweet exception in loop : ' + repr(e))
                     continue
-                process_tweet(tweet_txt, source_id, session, self.access_token)
+                process_tweet(tweet_txt, source_id, session, self.access_token, self.logger)
                 session.commit()
         finally:
             session.rollback()
@@ -231,19 +246,28 @@
     return Session, engine, metadata
 
             
-def process_leftovers(session, access_token):
+def process_leftovers(session, access_token, logger):
     
     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
     
     for src in sources:
         tweet_txt = src.original_json
-        process_tweet(tweet_txt, src.id, session, access_token)
+        process_tweet(tweet_txt, src.id, session, access_token, logger)
         session.commit()
 
         
     
     #get tweet source that do not match any message
     #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
+def process_log(logger_queues, stop_event):
+    while not stop_event.is_set():
+        for lqueue in logger_queues:
+            try:
+                record = lqueue.get_nowait()
+                logging.getLogger(record.name).handle(record)
+            except Queue.Empty:
+                continue
+        time.sleep(0.1)
 
         
 def get_options():
@@ -275,7 +299,7 @@
     
 
 if __name__ == '__main__':
-    
+
     (options, args) = get_options()
     
     set_logging(options)
@@ -287,10 +311,10 @@
     
     conn_str = options.conn_str.strip()
     if not re.match("^\w+://.+", conn_str):
-        conn_str = 'sqlite://' + options.conn_str
+        conn_str = 'sqlite:///' + options.conn_str
         
     if conn_str.startswith("sqlite") and options.new:
-        filepath = conn_str[conn_str.find("://"):]
+        filepath = conn_str[conn_str.find(":///")+4:]
         if os.path.exists(filepath):
             i = 1
             basename, extension = os.path.splitext(filepath)
@@ -320,7 +344,7 @@
     
     session = Session()
     try:
-        process_leftovers(session, access_token)
+        process_leftovers(session, access_token, utils.get_logger())
         session.commit()
     finally:
         session.rollback()
@@ -330,7 +354,7 @@
         utils.get_logger().debug("Leftovers processed. Exiting.")
         sys.exit()
 
-    queue = JoinableQueue()
+    queue = mQueue()
     stop_event = Event()
     
     #workaround for bug on using urllib2 and multiprocessing
@@ -344,15 +368,24 @@
     finally:
         if conn is not None:
             conn.close()
-        
+    
+    process_engines = []
+    logger_queues = []
     
-    sprocess = SourceProcess(Session, queue, options, access_token, stop_event)    
+    SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+    process_engines.append(engine_process)
+    lqueue = mQueue(1)
+    logger_queues.append(lqueue)
+    sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)    
     
     tweet_processes = []
     
     for i in range(options.process_nb - 1):
-        Session, engine, metadata = get_sessionmaker(conn_str)
-        cprocess = TweetProcess(Session, queue, options, access_token, stop_event)
+        SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+        process_engines.append(engine_process)
+        lqueue = mQueue(1)
+        logger_queues.append(lqueue)
+        cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue)
         tweet_processes.append(cprocess)
 
     def interupt_handler(signum, frame):
@@ -360,23 +393,28 @@
         
     signal.signal(signal.SIGINT, interupt_handler)
 
+    log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues,stop_event,))
+    log_thread.daemon = True
+
     sprocess.start()
     for cprocess in tweet_processes:
         cprocess.start()
 
+    log_thread.start()
+
     if options.duration >= 0:
-        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
+        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
+    
 
     while not stop_event.is_set():
         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
             stop_event.set()
             break
-        if sprocess.is_alive():
+        if sprocess.is_alive():            
             time.sleep(1)
         else:
             stop_event.set()
             break
-    
     utils.get_logger().debug("Joining Source Process")
     try:
         sprocess.join(10)
@@ -384,8 +422,6 @@
         utils.get_logger().debug("Pb joining Source Process - terminating")
         sprocess.terminate()
         
-    utils.get_logger().debug("Joining Queue")
-    #queue.join()
     for i, cprocess in enumerate(tweet_processes):
         utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
         try:
@@ -393,15 +429,30 @@
         except:
             utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
             cprocess.terminate()
+
     
-    utils.get_logger().debug("Processing leftovers")
-    session = Session()
+    utils.get_logger().debug("Close queues")
     try:
-        process_leftovers(session, access_token)
-        session.commit()
-    finally:
-        session.rollback()
-        session.close()
+        queue.close()
+        for lqueue in logger_queues:
+            lqueue.close()
+    except exception as e:
+        utils.get_logger().error("error when closing queues %s", repr(e))
+        #do nothing
+        
+    
+    if options.process_nb > 1:
+        utils.get_logger().debug("Processing leftovers")
+        session = Session()
+        try:
+            process_leftovers(session, access_token, utils.get_logger())
+            session.commit()
+        finally:
+            session.rollback()
+            session.close()
 
+    for pengine in process_engines:
+        pengine.dispose()
+        
     utils.get_logger().debug("Done. Exiting.")