script/stream/recorder_tweetstream.py
changeset 254 2209e66bb50b
parent 243 9213a63fa34a
child 255 500cd0405c7a
--- a/script/stream/recorder_tweetstream.py	Tue Aug 09 13:07:23 2011 +0200
+++ b/script/stream/recorder_tweetstream.py	Fri Aug 12 18:17:27 2011 +0200
@@ -3,22 +3,25 @@
 from iri_tweet.models import TweetSource, TweetLog
 from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
 from optparse import OptionParser
-from sqlalchemy.orm import sessionmaker
+from sqlalchemy.exc import OperationalError
+from sqlalchemy.orm import scoped_session, sessionmaker
 import StringIO
-import logging
 import anyjson
 import datetime
+import logging
 import os
+import re
 import shutil
 import signal
 import socket
+import sqlalchemy.schema
 import sys
 import time
 import traceback
 import tweepy.auth
 import tweetstream
-from iri_tweet.utils import logger
-from sqlalchemy.exc import OperationalError
+import urllib2
+#from iri_tweet.utils import get_logger
 socket._fileobject.default_bufsize = 0
 
 
@@ -30,6 +33,26 @@
 #just put it in a sqlite3 tqble
 
 
+def set_logging(options):
+    utils.set_logging(options, logging.getLogger('iri_tweet'))
+    utils.set_logging(options, logging.getLogger('multiprocessing'))
+    if options.debug >= 2:
+        utils.set_logging(options, logging.getLogger('sqlalchemy.engine'))
+    #utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
+    #utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
+    #utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
+
+def get_auth(options, access_token):
+    if options.username and options.password:
+        auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
+    else:
+        consumer_key = models.CONSUMER_KEY
+        consumer_secret = models.CONSUMER_SECRET
+        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
+        auth.set_access_token(*access_token)
+    return auth
+
+
 class ReconnectingTweetStream(tweetstream.FilterStream):
     """TweetStream class that automatically tries to reconnect if the
     connecting goes down. Reconnecting, and waiting for reconnecting, is
@@ -62,9 +85,10 @@
     def next(self):
         while True:
             try:
+                utils.get_logger().debug("return super.next")
                 return super(ReconnectingTweetStream, self).next()
             except tweetstream.ConnectionError, e:
-                logging.debug("connection error :" + str(e))
+                utils.get_logger().debug("connection error :" + str(e))
                 self._reconnects += 1
                 if self._reconnects > self.max_reconnects:
                     raise tweetstream.ConnectionError("Too many retries")
@@ -80,38 +104,43 @@
 
 
 
+
 class SourceProcess(Process):
     
-    def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
+    def __init__(self, session_maker, queue, options, access_token, stop_event):
         self.session_maker = session_maker
         self.queue = queue
-        self.auth = auth
-        self.track = track
-        self.debug = debug
-        self.reconnects = reconnects
-        self.token_filename = token_filename
+        self.track = options.track
+        self.reconnects = options.reconnects
+        self.token_filename = options.token_filename
         self.stop_event = stop_event
+        self.options = options
+        self.access_token = access_token
         super(SourceProcess, self).__init__()
-#        self.stop_event = 
     
     def run(self):
+        #import pydevd
+        #pydevd.settrace(suspend=False)
+
+        set_logging(self.options)
+        self.auth = get_auth(self.options, self.access_token) 
         
-        get_logger().debug("SourceProcess : run")
+        utils.get_logger().debug("SourceProcess : run")
         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
         track_list = [k for k in track_list.split(',')]
 
-        get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))                        
+        utils.get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))                        
         stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
-        get_logger().debug("SourceProcess : after connecting to stream")
+        utils.get_logger().debug("SourceProcess : after connecting to stream")
         stream.muststop = lambda: self.stop_event.is_set()
         
         session = self.session_maker()
         
         try:
             for tweet in stream:
-                get_logger().debug("tweet " + repr(tweet))
+                utils.get_logger().debug("tweet " + repr(tweet))
                 source = TweetSource(original_json=tweet)
-                get_logger().debug("source created")
+                utils.get_logger().debug("source created")
                 add_retries = 0
                 while add_retries < 10:
                     try:
@@ -121,21 +150,18 @@
                         break
                     except OperationalError as e:
                         session.rollback()
-                        get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
-                        if add_retries==10:
+                        utils.get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
+                        if add_retries == 10:
                             raise e
                      
                 source_id = source.id
-                get_logger().debug("before queue + source id " + repr(source_id))
-                self.queue.put((source_id, tweet), False)
-                #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
-                get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
+                utils.get_logger().debug("before queue + source id " + repr(source_id))
+                utils.get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
                 session.commit()
-#                if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
-#                    print "Stop recording after %d seconds." % (duration)
-#                    break
+                self.queue.put((source_id, tweet), False)
+
         except Exception as e:
-            get_logger().error("Error when processing tweet " + repr(e))
+            utils.get_logger().error("Error when processing tweet " + repr(e))
         finally:
             session.rollback()
             stream.close()
@@ -144,19 +170,19 @@
             self.stop_event.set()
 
 
-def process_tweet(tweet, source_id, session, token_filename):
+def process_tweet(tweet, source_id, session, access_token):
     try:
         tweet_obj = anyjson.deserialize(tweet)
         screen_name = ""
         if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
             screen_name = tweet_obj['user']['screen_name']
-        get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
-        get_logger().debug(u"Process_tweet :" + repr(tweet))
-        processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename)
+        utils.get_logger().debug(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
+        utils.get_logger().debug(u"Process_tweet :" + repr(tweet))
+        processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None)
         processor.process()
     except Exception as e:
         message = u"Error %s processing tweet %s" % (repr(e), tweet)
-        get_logger().error(message)
+        utils.get_logger().error(message)
         output = StringIO.StringIO()
         traceback.print_exception(Exception, e, None, None, output)
         error_stack = output.getvalue()
@@ -170,42 +196,49 @@
         
 class TweetProcess(Process):
     
-    def __init__(self, session_maker, queue, debug, token_filename, stop_event):
+    def __init__(self, session_maker, queue, options, access_token, stop_event):
         self.session_maker = session_maker
         self.queue = queue
-        self.debug = debug
         self.stop_event = stop_event
-        self.token_filename = token_filename
+        self.options = options
+        self.access_token = access_token
         super(TweetProcess, self).__init__()
 
 
     def run(self):
         
+        set_logging(self.options)
         session = self.session_maker()
         try:
             while not self.stop_event.is_set():
                 try:
-                    source_id, tweet_txt = queue.get(True, 10)
-                    get_logger().debug("Processing source id " + repr(source_id))
+                    source_id, tweet_txt = queue.get(True, 3)
+                    utils.get_logger().debug("Processing source id " + repr(source_id))
                 except Exception as e:
-                    get_logger().debug('Process tweet exception in loop : ' + repr(e))
+                    utils.get_logger().debug('Process tweet exception in loop : ' + repr(e))
                     continue
-                process_tweet(tweet_txt, source_id, session, self.token_filename)
+                process_tweet(tweet_txt, source_id, session, self.access_token)
                 session.commit()
-        except:
-            raise
         finally:
             session.rollback()
             self.stop_event.set()
             session.close()
+
+
+def get_sessionmaker(conn_str):
+    engine, metadata = models.setup_database(conn_str, echo=False, create_all=False)
+    Session = scoped_session(sessionmaker(bind=engine, autocommit=False))
+    return Session, engine, metadata
+
             
-def process_leftovers(session, token_filename):
+def process_leftovers(session, access_token):
     
     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
     
     for src in sources:
         tweet_txt = src.original_json
-        process_tweet(tweet_txt, src.id, session, token_filename)
+        process_tweet(tweet_txt, src.id, session, access_token)
+        session.commit()
 
         
     
@@ -215,8 +248,8 @@
         
 def get_options():
     parser = OptionParser()
-    parser.add_option("-f", "--file", dest="filename",
-                      help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
+    parser.add_option("-f", "--file", dest="conn_str",
+                      help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
     parser.add_option("-u", "--user", dest="username",
                       help="Twitter user", metavar="USER", default=None)
     parser.add_option("-w", "--password", dest="password",
@@ -231,8 +264,8 @@
                       help="Token file name")
     parser.add_option("-d", "--duration", dest="duration",
                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
-    parser.add_option("-N", "--consumer", dest="consumer_nb",
-                      help="number of consumer", metavar="CONSUMER", default=1, type='int')
+    parser.add_option("-N", "--nb-process", dest="process_nb",
+                      help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
 
 
 
@@ -245,53 +278,81 @@
     
     (options, args) = get_options()
     
-    utils.set_logging(options, get_logger())
+    set_logging(options)
         
     if options.debug:
         print "OPTIONS : "
         print repr(options)
     
-    if options.new and os.path.exists(options.filename):
-        i = 1
-        basename, extension = os.path.splitext(options.filename)
-        new_path = '%s.%d%s' % (basename, i, extension)
-        while i < 1000000 and os.path.exists(new_path):
-            i += 1
+    
+    conn_str = options.conn_str.strip()
+    if not re.match("^\w+://.+", conn_str):
+        conn_str = 'sqlite://' + options.conn_str
+        
+    if conn_str.startswith("sqlite") and options.new:
+        filepath = conn_str[conn_str.find("://"):]
+        if os.path.exists(filepath):
+            i = 1
+            basename, extension = os.path.splitext(filepath)
             new_path = '%s.%d%s' % (basename, i, extension)
-        if i >= 1000000:
-            raise Exception("Unable to find new filename for " + options.filename)
-        else:
-            shutil.move(options.filename, new_path)
+            while i < 1000000 and os.path.exists(new_path):
+                i += 1
+                new_path = '%s.%d%s' % (basename, i, extension)
+            if i >= 1000000:
+                raise Exception("Unable to find new filename for " + filepath)
+            else:
+                shutil.move(filepath, new_path)
 
+    Session, engine, metadata = get_sessionmaker(conn_str)
     
+    if options.new:
+        check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
+        if len(check_metadata.sorted_tables) > 0:
+            message = "Database %s not empty exiting" % conn_str
+            utils.get_logger().error(message)
+            sys.exit(message)
+    
+    metadata.create_all(engine)
+    
+    access_token = None
+    if not options.username or not options.password:
+        access_token = utils.get_oauth_token(options.token_filename)
+    
+    session = Session()
+    try:
+        process_leftovers(session, access_token)
+        session.commit()
+    finally:
+        session.rollback()
+        session.close()
+    
+    if options.process_nb <= 0:
+        utils.get_logger().debug("Leftovers processed. Exiting.")
+        sys.exit()
+
     queue = JoinableQueue()
     stop_event = Event()
-
-    if options.username and options.password:
-        auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
-    else:
-        consumer_key = models.CONSUMER_KEY
-        consumer_secret = models.CONSUMER_SECRET
-        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
-        auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))
-
-
-    engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
-    Session = sessionmaker(bind=engine)
     
-    session = Session()
-    process_leftovers(session, options.token_filename)
-    session.commit()
-    session.close()
-         
-    sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)    
+    #workaround for bug on using urllib2 and multiprocessing
+    req = urllib2.Request('http://localhost')
+    conn = None
+    try:
+        conn = urllib2.urlopen(req)
+    except:
+        pass
+        #donothing
+    finally:
+        if conn is not None:
+            conn.close()
+        
+    
+    sprocess = SourceProcess(Session, queue, options, access_token, stop_event)    
     
     tweet_processes = []
     
-    for i in range(options.consumer_nb):
-        engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
-        Session = sessionmaker(bind=engine)
-        cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
+    for i in range(options.process_nb - 1):
+        Session, engine, metadata = get_sessionmaker(conn_str)
+        cprocess = TweetProcess(Session, queue, options, access_token, stop_event)
         tweet_processes.append(cprocess)
 
     def interupt_handler(signum, frame):
@@ -311,23 +372,36 @@
             stop_event.set()
             break
         if sprocess.is_alive():
-            time.sleep(0.1)
+            time.sleep(1)
         else:
+            stop_event.set()
             break
     
-    get_logger().debug("Joining Source Process")
-    sprocess.join()
-    get_logger().debug("Joining Queue")
+    utils.get_logger().debug("Joining Source Process")
+    try:
+        sprocess.join(10)
+    except:
+        utils.get_logger().debug("Pb joining Source Process - terminating")
+        sprocess.terminate()
+        
+    utils.get_logger().debug("Joining Queue")
     #queue.join()
-    for i,cprocess in enumerate(tweet_processes):
-        get_logger().debug("Joining consumer process Nb %d" % (i+1))
-        cprocess.join()
+    for i, cprocess in enumerate(tweet_processes):
+        utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
+        try:
+            cprocess.join(3)
+        except:
+            utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
+            cprocess.terminate()
     
-    get_logger().debug("Processing leftovers")
+    utils.get_logger().debug("Processing leftovers")
     session = Session()
-    process_leftovers(session, options.token_filename)
-    session.commit()
-    session.close()
+    try:
+        process_leftovers(session, access_token)
+        session.commit()
+    finally:
+        session.rollback()
+        session.close()
 
-    get_logger().debug("Done. Exiting.")
-        
\ No newline at end of file
+    utils.get_logger().debug("Done. Exiting.")
+