script/stream/recorder_tweetstream.py
changeset 253 e9335ee3cf71
parent 243 9213a63fa34a
child 254 2209e66bb50b
--- a/script/stream/recorder_tweetstream.py	Wed Jul 27 18:32:56 2011 +0200
+++ b/script/stream/recorder_tweetstream.py	Tue Aug 09 13:07:23 2011 +0200
@@ -1,16 +1,24 @@
 from getpass import getpass
 from iri_tweet import models, utils
+from iri_tweet.models import TweetSource, TweetLog
+from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
 from optparse import OptionParser
 from sqlalchemy.orm import sessionmaker
-from sqlite3 import *
+import StringIO
+import logging
+import anyjson
 import datetime
-import logging
 import os
+import shutil
+import signal
 import socket
 import sys
 import time
+import traceback
+import tweepy.auth
 import tweetstream
-import tweepy.auth
+from iri_tweet.utils import logger
+from sqlalchemy.exc import OperationalError
 socket._fileobject.default_bufsize = 0
 
 
@@ -44,12 +52,12 @@
 
     """
 
-    def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, **kwargs):
+    def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs):
         self.max_reconnects = reconnects
         self.retry_wait = retry_wait
         self._reconnects = 0
         self._error_cb = error_cb
-        super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, **kwargs)
+        super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs)
 
     def next(self):
         while True:
@@ -72,45 +80,138 @@
 
 
 
-def process_tweet(tweet, session, debug, token_filename):
-    screen_name = ""
-    if 'user' in tweet and 'screen_name' in tweet['user']:
-        screen_name = tweet['user']['screen_name']
-    logging.info("Process_tweet from %s : %s" % (screen_name, tweet['text']))
-    logging.debug("Process_tweet :" + repr(tweet))
-    processor = utils.TwitterProcessor(tweet, None, session, token_filename)
-    processor.process()
-
-def main(username, password, track, session, debug, reconnects, token_filename, duration):
-
-    #username = username or raw_input('Twitter username: ')
-    #password = password or getpass('Twitter password: ')
-
-    track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
-    track_list = [k for k in track_list.split(',')]
+class SourceProcess(Process):
+    
+    def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
+        self.session_maker = session_maker
+        self.queue = queue
+        self.auth = auth
+        self.track = track
+        self.debug = debug
+        self.reconnects = reconnects
+        self.token_filename = token_filename
+        self.stop_event = stop_event
+        super(SourceProcess, self).__init__()
+#        self.stop_event = 
     
-    if username and password:
-        auth = tweepy.auth.BasicAuthHandler(username, password)        
-    else:
-        consumer_key = models.CONSUMER_KEY
-        consumer_secret = models.CONSUMER_SECRET
-        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
-        auth.set_access_token(*(utils.get_oauth_token(token_filename)))
-    
-    if duration >= 0:
-        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration)
+    def run(self):
+        
+        get_logger().debug("SourceProcess : run")
+        track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
+        track_list = [k for k in track_list.split(',')]
+
+        get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))                        
+        stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
+        get_logger().debug("SourceProcess : after connecting to stream")
+        stream.muststop = lambda: self.stop_event.is_set()
+        
+        session = self.session_maker()
+        
+        try:
+            for tweet in stream:
+                get_logger().debug("tweet " + repr(tweet))
+                source = TweetSource(original_json=tweet)
+                get_logger().debug("source created")
+                add_retries = 0
+                while add_retries < 10:
+                    try:
+                        add_retries += 1
+                        session.add(source)
+                        session.flush()
+                        break
+                    except OperationalError as e:
+                        session.rollback()
+                        get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
+                        if add_retries==10:
+                            raise e
+                     
+                source_id = source.id
+                get_logger().debug("before queue + source id " + repr(source_id))
+                self.queue.put((source_id, tweet), False)
+                #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
+                get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+                session.commit()
+#                if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
+#                    print "Stop recording after %d seconds." % (duration)
+#                    break
+        except Exception as e:
+            get_logger().error("Error when processing tweet " + repr(e))
+        finally:
+            session.rollback()
+            stream.close()
+            session.close()
+            self.queue.close()
+            self.stop_event.set()
+
+
+def process_tweet(tweet, source_id, session, token_filename):
+    try:
+        tweet_obj = anyjson.deserialize(tweet)
+        screen_name = ""
+        if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
+            screen_name = tweet_obj['user']['screen_name']
+        get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
+        get_logger().debug(u"Process_tweet :" + repr(tweet))
+        processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename)
+        processor.process()
+    except Exception as e:
+        message = u"Error %s processing tweet %s" % (repr(e), tweet)
+        get_logger().error(message)
+        output = StringIO.StringIO()
+        traceback.print_exception(Exception, e, None, None, output)
+        error_stack = output.getvalue()
+        output.close()
+        session.rollback()
+        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
+        session.add(tweet_log)
+        session.commit()
+
     
-    stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects)
-    try:
-        for tweet in stream:            
-            if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
-                print "Stop recording after %d seconds." % (duration)
-                break
-            process_tweet(tweet, session, debug, token_filename)
-            logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime)))
-            session.commit()
-    finally:
-        stream.close()
+        
+class TweetProcess(Process):
+    
+    def __init__(self, session_maker, queue, debug, token_filename, stop_event):
+        self.session_maker = session_maker
+        self.queue = queue
+        self.debug = debug
+        self.stop_event = stop_event
+        self.token_filename = token_filename
+        super(TweetProcess, self).__init__()
+
+
+    def run(self):
+        
+        session = self.session_maker()
+        try:
+            while not self.stop_event.is_set():
+                try:
+                    source_id, tweet_txt = queue.get(True, 10)
+                    get_logger().debug("Processing source id " + repr(source_id))
+                except Exception as e:
+                    get_logger().debug('Process tweet exception in loop : ' + repr(e))
+                    continue
+                process_tweet(tweet_txt, source_id, session, self.token_filename)
+                session.commit()
+        except:
+            raise
+        finally:
+            session.rollback()
+            self.stop_event.set()
+            session.close()
+            
+def process_leftovers(session, token_filename):
+    
+    sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
+    
+    for src in sources:
+        tweet_txt = src.original_json
+        process_tweet(tweet_txt, src.id, session, token_filename)
+
+        
+    
+    #get tweet source that do not match any message
+    #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
+
         
 def get_options():
     parser = OptionParser()
@@ -130,6 +231,9 @@
                       help="Token file name")
     parser.add_option("-d", "--duration", dest="duration",
                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
+    parser.add_option("-N", "--consumer", dest="consumer_nb",
+                      help="number of consumer", metavar="CONSUMER", default=1, type='int')
+
 
 
     utils.set_logging_options(parser)
@@ -139,27 +243,91 @@
 
 if __name__ == '__main__':
     
-
     (options, args) = get_options()
     
-    utils.set_logging(options)
+    utils.set_logging(options, get_logger())
         
     if options.debug:
         print "OPTIONS : "
         print repr(options)
     
     if options.new and os.path.exists(options.filename):
-        os.remove(options.filename)
+        i = 1
+        basename, extension = os.path.splitext(options.filename)
+        new_path = '%s.%d%s' % (basename, i, extension)
+        while i < 1000000 and os.path.exists(new_path):
+            i += 1
+            new_path = '%s.%d%s' % (basename, i, extension)
+        if i >= 1000000:
+            raise Exception("Unable to find new filename for " + options.filename)
+        else:
+            shutil.move(options.filename, new_path)
+
     
-    engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug>=2))
-    Session = sessionmaker(bind=engine)
-    session = Session()
+    queue = JoinableQueue()
+    stop_event = Event()
+
+    if options.username and options.password:
+        auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
+    else:
+        consumer_key = models.CONSUMER_KEY
+        consumer_secret = models.CONSUMER_SECRET
+        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
+        auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))
+
 
-    try:
-        try:
-            main(options.username, options.password, options.track, session, options.debug, options.reconnects, options.token_filename, options.duration)
-        except KeyboardInterrupt:
-            print '\nGoodbye!'
-        session.commit()
-    finally:
-        session.close()
+    engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
+    Session = sessionmaker(bind=engine)
+    
+    session = Session()
+    process_leftovers(session, options.token_filename)
+    session.commit()
+    session.close()
+         
+    sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)    
+    
+    tweet_processes = []
+    
+    for i in range(options.consumer_nb):
+        engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
+        Session = sessionmaker(bind=engine)
+        cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
+        tweet_processes.append(cprocess)
+
+    def interupt_handler(signum, frame):
+        stop_event.set()
+        
+    signal.signal(signal.SIGINT, interupt_handler)
+
+    sprocess.start()
+    for cprocess in tweet_processes:
+        cprocess.start()
+
+    if options.duration >= 0:
+        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
+
+    while not stop_event.is_set():
+        if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
+            stop_event.set()
+            break
+        if sprocess.is_alive():
+            time.sleep(0.1)
+        else:
+            break
+    
+    get_logger().debug("Joining Source Process")
+    sprocess.join()
+    get_logger().debug("Joining Queue")
+    #queue.join()
+    for i,cprocess in enumerate(tweet_processes):
+        get_logger().debug("Joining consumer process Nb %d" % (i+1))
+        cprocess.join()
+    
+    get_logger().debug("Processing leftovers")
+    session = Session()
+    process_leftovers(session, options.token_filename)
+    session.commit()
+    session.close()
+
+    get_logger().debug("Done. Exiting.")
+        
\ No newline at end of file