script/stream/recorder_tweetstream.py
changeset 242 cdd7d3c0549c
parent 207 621fa6caec0c
child 243 9213a63fa34a
--- a/script/stream/recorder_tweetstream.py	Tue Jul 26 23:57:09 2011 +0200
+++ b/script/stream/recorder_tweetstream.py	Wed Jul 27 00:04:55 2011 +0200
@@ -1,16 +1,23 @@
 from getpass import getpass
 from iri_tweet import models, utils
+from iri_tweet.models import TweetSource, TweetLog
+from multiprocessing import Queue, JoinableQueue, Process, Event
 from optparse import OptionParser
 from sqlalchemy.orm import sessionmaker
 from sqlite3 import *
+import StringIO
+import anyjson
 import datetime
 import logging
 import os
+import shutil
+import signal
 import socket
 import sys
 import time
+import traceback
+import tweepy.auth
 import tweetstream
-import tweepy.auth
 socket._fileobject.default_bufsize = 0
 
 
@@ -44,12 +51,12 @@
 
     """
 
-    def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, **kwargs):
+    def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs):
         self.max_reconnects = reconnects
         self.retry_wait = retry_wait
         self._reconnects = 0
         self._error_cb = error_cb
-        super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, **kwargs)
+        super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs)
 
     def next(self):
         while True:
@@ -72,45 +79,134 @@
 
 
 
-def process_tweet(tweet, session, debug, token_filename):
-    screen_name = ""
-    if 'user' in tweet and 'screen_name' in tweet['user']:
-        screen_name = tweet['user']['screen_name']
-    logging.info("Process_tweet from %s : %s" % (screen_name, tweet['text']))
-    logging.debug("Process_tweet :" + repr(tweet))
-    processor = utils.TwitterProcessor(tweet, None, session, token_filename)
-    processor.process()
+class SourceProcess(Process):
+    
+    def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
+        super(SourceProcess, self).__init__()
+        self.session_maker = session_maker
+        self.queue = queue
+        self.auth = auth
+        self.track = track
+        self.debug = debug
+        self.reconnects = reconnects
+        self.token_filename = token_filename
+        self.stop_event = stop_event
+#        self.stop_event = 
+    
+    def run(self):
+        
+        track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
+        track_list = [k for k in track_list.split(',')]
+                        
+        stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
+        stream.muststop = lambda: self.stop_event.is_set()
+        
+        session = self.session_maker()
+        
+        try:
+            for tweet in stream:
+                source = TweetSource(original_json=tweet)
+                session.add(source)
+                session.flush()
+                source_id = source.id
+                queue.put((source_id, tweet), False)
+                #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
+                logging.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+                session.commit()
+#                if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
+#                    print "Stop recording after %d seconds." % (duration)
+#                    break
+        except:
+            session.rollback()
+        finally:
+            stream.close()
+            session.close()
+            
+        
+class TweetProcess(Process):
+    
+    def __init__(self, session_maker, queue, debug, token_filename, stop_event):
+        super(TweetProcess, self).__init__()
+        self.session_maker = session_maker
+        self.queue = queue
+        self.debug = debug
+        self.stop_event = stop_event
+        self.token_filename = token_filename
 
-def main(username, password, track, session, debug, reconnects, token_filename, duration):
+    def run(self):
+        
+        session = self.session_maker()
+        try:
+            while not self.stop_event.is_set():
+                try:
+                    source_id, tweet_txt = queue.get(True, 30)
+                except:
+                    continue
+                process_tweet(tweet_txt, source_id, session)
+                session.commit()
+                self.queue.task_done()
+        except:
+            session.rollback()
+            raise
+        finally:
+            session.close()
+            
+        
+    def process_tweet(tweet, source_id, session):
+        
+        try:
+            tweet_obj = anyjson.deserialize(tweet)
+            screen_name = ""
+            if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
+                screen_name = tweet_obj['user']['screen_name']
+            logging.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
+            logging.debug(u"Process_tweet :" + repr(tweet))
+            processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, self.token_filename)
+            processor.process()
+        except Exception, e:
+            message = u"Error %e processing tweet %s" % (unicode(e), tweet)
+            logging.error(message)
+            output = StringIO.StringIO()
+            traceback.print_exception(Exception, e, None, None, output)
+            tweet_log = TweetLog(status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=output.getvalue())
+            output.close()
+
+
+
+#def main_source(username, password, track, session, debug, reconnects, token_filename, duration):
 
     #username = username or raw_input('Twitter username: ')
     #password = password or getpass('Twitter password: ')
 
-    track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
-    track_list = [k for k in track_list.split(',')]
+#    track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
+#    track_list = [k for k in track_list.split(',')]
     
-    if username and password:
-        auth = tweepy.auth.BasicAuthHandler(username, password)        
-    else:
-        consumer_key = models.CONSUMER_KEY
-        consumer_secret = models.CONSUMER_SECRET
-        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
-        auth.set_access_token(*(utils.get_oauth_token(token_filename)))
+#    if username and password:
+#        auth = tweepy.auth.BasicAuthHandler(username, password)        
+#    else:
+#        consumer_key = models.CONSUMER_KEY
+#        consumer_secret = models.CONSUMER_SECRET
+#        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
+#        auth.set_access_token(*(utils.get_oauth_token(token_filename)))
+    
+#    if duration >= 0:
+#        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration)
     
-    if duration >= 0:
-        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration)
-    
-    stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects)
-    try:
-        for tweet in stream:            
-            if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
-                print "Stop recording after %d seconds." % (duration)
-                break
-            process_tweet(tweet, session, debug, token_filename)
-            logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime)))
-            session.commit()
-    finally:
-        stream.close()
+#    stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects, as_text=True)
+#    try:
+#        for tweet in stream:
+#            source = TweetSource(original_json=tweet)
+#            session.add(source)
+#            session.flush()            
+#            source_id = source.id
+#            process_tweet(tweet, source_id, session, debug, token_filename)
+#            logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime)))
+#            session.commit()
+#            if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
+#                print "Stop recording after %d seconds." % (duration)
+#                break
+#    finally:
+#        stream.close()
         
 def get_options():
     parser = OptionParser()
@@ -130,6 +226,9 @@
                       help="Token file name")
     parser.add_option("-d", "--duration", dest="duration",
                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
+    parser.add_option("-N", "--consumer", dest="consumer_nb",
+                      help="number of consumer", metavar="CONSUMER", default=1, type='int')
+
 
 
     utils.set_logging_options(parser)
@@ -139,7 +238,6 @@
 
 if __name__ == '__main__':
     
-
     (options, args) = get_options()
     
     utils.set_logging(options)
@@ -149,17 +247,62 @@
         print repr(options)
     
     if options.new and os.path.exists(options.filename):
-        os.remove(options.filename)
+        i = 1
+        basename, extension = os.path.splitext(options.filename)
+        new_path = '$s.%d.%s' % (basename, i, extension)
+        while i < 1000000 and os.path.exists(new_path):
+            i += 1
+            new_path = '$s.%d.%s' % (basename, i, extension)
+        if i >= 1000000:
+            raise Exception("Unable to find new filename for " + options.filename)
+        else:
+            shutil.move(options.filename, new_path)
+
     
-    engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug>=2))
+    engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
     Session = sessionmaker(bind=engine)
-    session = Session()
+    queue = JoinableQueue()
+    stop_event = Event()
+
+    if options.username and options.password:
+        auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
+    else:
+        consumer_key = models.CONSUMER_KEY
+        consumer_secret = models.CONSUMER_SECRET
+        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
+        auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))
 
-    try:
-        try:
-            main(options.username, options.password, options.track, session, options.debug, options.reconnects, options.token_filename, options.duration)
-        except KeyboardInterrupt:
-            print '\nGoodbye!'
-        session.commit()
-    finally:
-        session.close()
+     
+    sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)    
+    
+    tweet_processes = []
+    
+    for i in range(options.consumer_nb):
+        cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
+        tweet_processes.append(cprocess)
+
+    def interupt_handler(signum, frame):
+        stop_event.set()
+        
+    signal.signal(signal.SIGINT, interupt_handler)
+
+    sprocess.start()
+    for cprocess in tweet_processes:
+        cprocess.start()
+
+    if options.duration >= 0:
+        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
+
+    while not stop_event.is_set():
+        if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
+            stop_event.set()
+            break
+        if sprocess.is_alive():
+            time.sleep(0.1)
+        else:
+            break
+    
+    sprocess.join()
+    queue.join()
+    for cprocess in tweet_processes:
+        cprocess.join()