script/stream/recorder_tweetstream.py
changeset 243 9213a63fa34a
parent 242 cdd7d3c0549c
child 254 2209e66bb50b
--- a/script/stream/recorder_tweetstream.py	Wed Jul 27 00:04:55 2011 +0200
+++ b/script/stream/recorder_tweetstream.py	Mon Aug 08 09:01:40 2011 +0200
@@ -1,14 +1,13 @@
 from getpass import getpass
 from iri_tweet import models, utils
 from iri_tweet.models import TweetSource, TweetLog
-from multiprocessing import Queue, JoinableQueue, Process, Event
+from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
 from optparse import OptionParser
 from sqlalchemy.orm import sessionmaker
-from sqlite3 import *
 import StringIO
+import logging
 import anyjson
 import datetime
-import logging
 import os
 import shutil
 import signal
@@ -18,6 +17,8 @@
 import traceback
 import tweepy.auth
 import tweetstream
+from iri_tweet.utils import logger
+from sqlalchemy.exc import OperationalError
 socket._fileobject.default_bufsize = 0
 
 
@@ -82,7 +83,6 @@
 class SourceProcess(Process):
     
     def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
-        super(SourceProcess, self).__init__()
         self.session_maker = session_maker
         self.queue = queue
         self.auth = auth
@@ -91,47 +91,93 @@
         self.reconnects = reconnects
         self.token_filename = token_filename
         self.stop_event = stop_event
+        super(SourceProcess, self).__init__()
 #        self.stop_event = 
     
     def run(self):
         
+        get_logger().debug("SourceProcess : run")
         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
         track_list = [k for k in track_list.split(',')]
-                        
+
+        get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))                        
         stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
+        get_logger().debug("SourceProcess : after connecting to stream")
         stream.muststop = lambda: self.stop_event.is_set()
         
         session = self.session_maker()
         
         try:
             for tweet in stream:
+                get_logger().debug("tweet " + repr(tweet))
                 source = TweetSource(original_json=tweet)
-                session.add(source)
-                session.flush()
+                get_logger().debug("source created")
+                add_retries = 0
+                while add_retries < 10:
+                    try:
+                        add_retries += 1
+                        session.add(source)
+                        session.flush()
+                        break
+                    except OperationalError as e:
+                        session.rollback()
+                        get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
+                        if add_retries==10:
+                            raise e
+                     
                 source_id = source.id
-                queue.put((source_id, tweet), False)
+                get_logger().debug("before queue + source id " + repr(source_id))
+                self.queue.put((source_id, tweet), False)
                 #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
-                logging.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+                get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
                 session.commit()
 #                if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
 #                    print "Stop recording after %d seconds." % (duration)
 #                    break
-        except:
+        except Exception as e:
+            get_logger().error("Error when processing tweet " + repr(e))
+        finally:
             session.rollback()
-        finally:
             stream.close()
             session.close()
-            
+            self.queue.close()
+            self.stop_event.set()
+
+
+def process_tweet(tweet, source_id, session, token_filename):
+    try:
+        tweet_obj = anyjson.deserialize(tweet)
+        screen_name = ""
+        if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
+            screen_name = tweet_obj['user']['screen_name']
+        get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
+        get_logger().debug(u"Process_tweet :" + repr(tweet))
+        processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename)
+        processor.process()
+    except Exception as e:
+        message = u"Error %s processing tweet %s" % (repr(e), tweet)
+        get_logger().error(message)
+        output = StringIO.StringIO()
+        traceback.print_exception(Exception, e, None, None, output)
+        error_stack = output.getvalue()
+        output.close()
+        session.rollback()
+        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
+        session.add(tweet_log)
+        session.commit()
+
+    
         
 class TweetProcess(Process):
     
     def __init__(self, session_maker, queue, debug, token_filename, stop_event):
-        super(TweetProcess, self).__init__()
         self.session_maker = session_maker
         self.queue = queue
         self.debug = debug
         self.stop_event = stop_event
         self.token_filename = token_filename
+        super(TweetProcess, self).__init__()
+
 
     def run(self):
         
@@ -139,74 +185,33 @@
         try:
             while not self.stop_event.is_set():
                 try:
-                    source_id, tweet_txt = queue.get(True, 30)
-                except:
+                    source_id, tweet_txt = queue.get(True, 10)
+                    get_logger().debug("Processing source id " + repr(source_id))
+                except Exception as e:
+                    get_logger().debug('Process tweet exception in loop : ' + repr(e))
                     continue
-                process_tweet(tweet_txt, source_id, session)
+                process_tweet(tweet_txt, source_id, session, self.token_filename)
                 session.commit()
-                self.queue.task_done()
         except:
-            session.rollback()
             raise
         finally:
+            session.rollback()
+            self.stop_event.set()
             session.close()
             
-        
-    def process_tweet(tweet, source_id, session):
-        
-        try:
-            tweet_obj = anyjson.deserialize(tweet)
-            screen_name = ""
-            if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
-                screen_name = tweet_obj['user']['screen_name']
-            logging.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
-            logging.debug(u"Process_tweet :" + repr(tweet))
-            processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, self.token_filename)
-            processor.process()
-        except Exception, e:
-            message = u"Error %e processing tweet %s" % (unicode(e), tweet)
-            logging.error(message)
-            output = StringIO.StringIO()
-            traceback.print_exception(Exception, e, None, None, output)
-            tweet_log = TweetLog(status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=output.getvalue())
-            output.close()
-
-
-
-#def main_source(username, password, track, session, debug, reconnects, token_filename, duration):
-
-    #username = username or raw_input('Twitter username: ')
-    #password = password or getpass('Twitter password: ')
+def process_leftovers(session, token_filename):
+    
+    sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
+    
+    for src in sources:
+        tweet_txt = src.original_json
+        process_tweet(tweet_txt, src.id, session, token_filename)
 
-#    track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
-#    track_list = [k for k in track_list.split(',')]
-    
-#    if username and password:
-#        auth = tweepy.auth.BasicAuthHandler(username, password)        
-#    else:
-#        consumer_key = models.CONSUMER_KEY
-#        consumer_secret = models.CONSUMER_SECRET
-#        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
-#        auth.set_access_token(*(utils.get_oauth_token(token_filename)))
-    
-#    if duration >= 0:
-#        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration)
+        
     
-#    stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects, as_text=True)
-#    try:
-#        for tweet in stream:
-#            source = TweetSource(original_json=tweet)
-#            session.add(source)
-#            session.flush()            
-#            source_id = source.id
-#            process_tweet(tweet, source_id, session, debug, token_filename)
-#            logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime)))
-#            session.commit()
-#            if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
-#                print "Stop recording after %d seconds." % (duration)
-#                break
-#    finally:
-#        stream.close()
+    #get tweet source that do not match any message
+    #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
+
         
 def get_options():
     parser = OptionParser()
@@ -240,7 +245,7 @@
     
     (options, args) = get_options()
     
-    utils.set_logging(options)
+    utils.set_logging(options, get_logger())
         
     if options.debug:
         print "OPTIONS : "
@@ -249,18 +254,16 @@
     if options.new and os.path.exists(options.filename):
         i = 1
         basename, extension = os.path.splitext(options.filename)
-        new_path = '$s.%d.%s' % (basename, i, extension)
+        new_path = '%s.%d%s' % (basename, i, extension)
         while i < 1000000 and os.path.exists(new_path):
             i += 1
-            new_path = '$s.%d.%s' % (basename, i, extension)
+            new_path = '%s.%d%s' % (basename, i, extension)
         if i >= 1000000:
             raise Exception("Unable to find new filename for " + options.filename)
         else:
             shutil.move(options.filename, new_path)
 
     
-    engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
-    Session = sessionmaker(bind=engine)
     queue = JoinableQueue()
     stop_event = Event()
 
@@ -272,12 +275,22 @@
         auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
         auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))
 
-     
+
+    engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
+    Session = sessionmaker(bind=engine)
+    
+    session = Session()
+    process_leftovers(session, options.token_filename)
+    session.commit()
+    session.close()
+         
     sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)    
     
     tweet_processes = []
     
     for i in range(options.consumer_nb):
+        engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
+        Session = sessionmaker(bind=engine)
         cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
         tweet_processes.append(cprocess)
 
@@ -302,7 +315,19 @@
         else:
             break
     
+    get_logger().debug("Joining Source Process")
     sprocess.join()
-    queue.join()
-    for cprocess in tweet_processes:
+    get_logger().debug("Joining Queue")
+    #queue.join()
+    for i,cprocess in enumerate(tweet_processes):
+        get_logger().debug("Joining consumer process Nb %d" % (i+1))
         cprocess.join()
+    
+    get_logger().debug("Processing leftovers")
+    session = Session()
+    process_leftovers(session, options.token_filename)
+    session.commit()
+    session.close()
+
+    get_logger().debug("Done. Exiting.")
+        
\ No newline at end of file