script/stream/recorder_stream.py
changeset 888 6fc6637d8403
parent 886 1e110b03ae96
child 890 9c57883dbb9d
--- a/script/stream/recorder_stream.py	Sun Apr 21 21:55:06 2013 +0200
+++ b/script/stream/recorder_stream.py	Tue May 07 18:57:54 2013 +0200
@@ -1,14 +1,14 @@
-from getpass import getpass
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
 from iri_tweet import models, utils
 from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
-from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, 
-    get_logger)
-from optparse import OptionParser
+from iri_tweet.processor import get_processor
+from multiprocessing import Queue as mQueue, Process, Event
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm import scoped_session
 import Queue
 import StringIO
 import anyjson
+import argparse
 import datetime
 import inspect
 import iri_tweet.stream
@@ -21,6 +21,7 @@
 import socket
 import sqlalchemy.schema
 import sys
+import thread
 import threading
 import time
 import traceback
@@ -35,7 +36,20 @@
 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
 # just put it in a sqlite3 tqble
 
-DEFAULT_TIMEOUT = 5
+DEFAULT_TIMEOUT = 3
+
+class Requesthandler(BaseHTTPRequestHandler):
+
+    def __init__(self, request, client_address, server):
+        BaseHTTPRequestHandler.__init__(self, request, client_address, server)
+        
+    def do_GET(self):
+        self.send_response(200)
+        self.end_headers()
+    
+    def log_message(self, format, *args):        # @ReservedAssignment
+        pass
+
 
 def set_logging(options):
     loggers = []
@@ -55,19 +69,16 @@
     return qlogger
 
 def get_auth(options, access_token):
-    if options.username and options.password:
-        auth = requests.auth.BasicAuthHandler(options.username, options.password)        
-    else:
-        consumer_key = models.CONSUMER_KEY
-        consumer_secret = models.CONSUMER_SECRET
-        auth = requests_oauthlib.OAuth1(access_token[0], access_token[1], consumer_key, consumer_secret, signature_type='auth_header')
+    consumer_key = options.consumer_key
+    consumer_secret = options.consumer_secret
+    auth = requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=access_token[0], resource_owner_secret=access_token[1], signature_type='auth_header')
     return auth
 
 
-def add_process_event(type, args, session_maker):
+def add_process_event(event_type, args, session_maker):
     session = session_maker()
     try:
-        evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
+        evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=event_type)
         session.add(evt)
         session.commit()
     finally:
@@ -83,6 +94,7 @@
         self.options = options
         self.logger_queue = logger_queue
         self.stop_event = stop_event
+        self.consumer_token = (options.consumer_key, options.consumer_secret)
         self.access_token = access_token
 
         super(BaseProcess, self).__init__()
@@ -122,16 +134,15 @@
     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
         self.track = options.track
         self.token_filename = options.token_filename
-        self.catchup = options.catchup
         self.timeout = options.timeout
         self.stream = None
         super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
                     
     def __source_stream_iter(self):
-        
-        self.logger = set_logging_process(self.options, self.logger_queue)
+                
         self.logger.debug("SourceProcess : run ")
         
+        self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.access_token))
         self.auth = get_auth(self.options, self.access_token) 
         self.logger.debug("SourceProcess : auth set ")
         
@@ -140,8 +151,8 @@
         
         track_list = [k.strip() for k in track_list.split(',')]
 
-        self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
-        self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger)
+        self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth)))                        
+        self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, chunk_size=512, logger=self.logger)
         self.logger.debug("SourceProcess : after connecting to stream")
         self.stream.muststop = lambda: self.stop_event.is_set()        
         
@@ -149,11 +160,14 @@
         
         session = self.session_maker()
         
+        #import pydevd
+        #pydevd.settrace(suspend=False)
+
+        
         try:
             for tweet in stream_wrapper:
                 if not self.parent_is_alive():
                     self.stop_event.set()
-                    stop_thread.join(5)
                     sys.exit()
                 self.logger.debug("SourceProcess : tweet " + repr(tweet))
                 source = TweetSource(original_json=tweet)
@@ -193,41 +207,52 @@
 
     def do_run(self):
         
-        # import pydevd
-        # pydevd.settrace(suspend=False)
+        self.logger = set_logging_process(self.options, self.logger_queue)                
         
         source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
         
         source_stream_iter_thread.start()
         
-        while not self.stop_event.is_set():
-            self.logger.debug("SourceProcess : In while after start")
-            self.stop_event.wait(DEFAULT_TIMEOUT)
-            if self.stop_event.is_set() and self.stream:
-                self.stream.close()
-            elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
-                self.stop_event.set()
+        try:
+            while not self.stop_event.is_set():
+                self.logger.debug("SourceProcess : In while after start")
+                self.stop_event.wait(DEFAULT_TIMEOUT)
+        except KeyboardInterrupt:
+            self.stop_event.set()
+            pass
 
+        if self.stop_event.is_set() and self.stream:
+            self.stream.close()
+        elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
+            self.stop_event.set()
+    
         self.logger.info("SourceProcess : join")
         source_stream_iter_thread.join(30)
 
 
-def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger):
+def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger):
     try:
         if not tweet.strip():
             return
         tweet_obj = anyjson.deserialize(tweet)
-        if 'text' not in tweet_obj:
+        processor_klass = get_processor(tweet_obj)
+        if not processor_klass:
             tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
             session.add(tweet_log)
             return
-        screen_name = ""
-        if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
-            screen_name = tweet_obj['user']['screen_name']
-        logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
-        logger.debug(u"Process_tweet :" + repr(tweet))
-        processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user)
+        processor = processor_klass(json_dict=tweet_obj,
+                                    json_txt=tweet,
+                                    source_id=source_id,
+                                    session=session,
+                                    consumer_token=consumer_token,
+                                    access_token=access_token,
+                                    token_filename=token_filename,
+                                    user_query_twitter=twitter_query_user,
+                                    logger=logger)
+        logger.info(processor.log_info())                        
+        logger.debug(u"Process_tweet :" + repr(tweet))                
         processor.process()
+        
     except ValueError as e:
         message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
         output = StringIO.StringIO()
@@ -274,8 +299,10 @@
                 except Exception as e:
                     self.logger.debug('Process tweet exception in loop : ' + repr(e))
                     continue
-                process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger)
+                process_tweet(tweet_txt, source_id, session, self.consumer_token, self.access_token, self.twitter_query_user, self.options.token_filename, self.logger)
                 session.commit()
+        except KeyboardInterrupt:
+            self.stop_event.set()
         finally:
             session.rollback()
             session.close()
@@ -287,15 +314,20 @@
     return Session, engine, metadata
 
             
-def process_leftovers(session, access_token, twitter_query_user, logger):
+def process_leftovers(session, consumer_token, access_token, twitter_query_user, token_filename, ask_process_leftovers, logger):
     
     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
+    sources_count = sources.count()
     
+    if sources_count > 10 and ask_process_leftovers:
+        resp = raw_input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count)
+        if resp and resp.strip().lower() == "n":
+            return
+    logger.info("Process leftovers, %d tweets to process" % (sources_count))
     for src in sources:
         tweet_txt = src.original_json
-        process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger)
+        process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger)
         session.commit()
-
         
     
     # get tweet source that do not match any message
@@ -315,38 +347,36 @@
         
 def get_options():
 
-    usage = "usage: %prog [options]"
+    usage = "usage: %(prog)s [options]"
 
-    parser = OptionParser(usage=usage)
+    parser = argparse.ArgumentParser(usage=usage)
 
-    parser.add_option("-f", "--file", dest="conn_str",
-                      help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
-    parser.add_option("-u", "--user", dest="username",
-                      help="Twitter user", metavar="USER", default=None)
-    parser.add_option("-w", "--password", dest="password",
-                      help="Twitter password", metavar="PASSWORD", default=None)
-    parser.add_option("-T", "--track", dest="track",
-                      help="Twitter track", metavar="TRACK")
-    parser.add_option("-n", "--new", dest="new", action="store_true",
-                      help="new database", default=False)
-    parser.add_option("-D", "--daemon", dest="daemon", action="store_true",
-                      help="launch daemon", default=False)
-    parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
-                      help="Token file name")
-    parser.add_option("-d", "--duration", dest="duration",
-                      help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
-    parser.add_option("-N", "--nb-process", dest="process_nb",
-                      help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
-    parser.add_option("--url", dest="url",
-                      help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url)
-    parser.add_option("--query-user", dest="twitter_query_user", action="store_true",
-                      help="Query twitter for users", default=False, metavar="QUERY_USER")
-    parser.add_option("--catchup", dest="catchup",
-                      help="catchup count for tweets", default=None, metavar="CATCHUP", type='int')
-    parser.add_option("--timeout", dest="timeout",
-                      help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int')
-    
-
+    parser.add_argument("-f", "--file", dest="conn_str",
+                        help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
+    parser.add_argument("-T", "--track", dest="track",
+                        help="Twitter track", metavar="TRACK")
+    parser.add_argument("-k", "--key", dest="consumer_key",
+                        help="Twitter consumer key", metavar="CONSUMER_KEY", required=True)
+    parser.add_argument("-s", "--secret", dest="consumer_secret",
+                        help="Twitter consumer secret", metavar="CONSUMER_SECRET", required=True)
+    parser.add_argument("-n", "--new", dest="new", action="store_true",
+                        help="new database", default=False)
+    parser.add_argument("-D", "--daemon", dest="daemon", action="store_true",
+                        help="launch daemon", default=False)
+    parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
+                        help="Token file name")
+    parser.add_argument("-d", "--duration", dest="duration",
+                        help="Duration of recording in seconds", metavar="DURATION", default= -1, type=int)
+    parser.add_argument("-N", "--nb-process", dest="process_nb",
+                        help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type=int)
+    parser.add_argument("--url", dest="url",
+                        help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url)
+    parser.add_argument("--query-user", dest="twitter_query_user", action="store_true",
+                        help="Query twitter for users", default=False)
+    parser.add_argument("--timeout", dest="timeout",
+                        help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type=int)
+    parser.add_argument("--ask-process-leftovers", dest="ask_process_leftovers", action="store_false",
+                        help="ask process leftover", default=True)
 
 
     utils.set_logging_options(parser)
@@ -357,14 +387,14 @@
 def do_run(options, session_maker):
 
     stop_args = {}
-
-    access_token = None
-    if not options.username or not options.password:
-        access_token = utils.get_oauth_token(options.token_filename)
+    
+    consumer_token = (options.consumer_key, options.consumer_secret)
+    access_token = utils.get_oauth_token(consumer_key=consumer_token[0], consumer_secret=consumer_token[1], token_file_path=options.token_filename)
+    
     
     session = session_maker()
     try:
-        process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
+        process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
         session.commit()
     finally:
         session.rollback()
@@ -378,7 +408,10 @@
     stop_event = Event()
     
     # workaround for bug on using urllib2 and multiprocessing
-    req = urllib2.Request('http://localhost')
+    httpd = HTTPServer(('127.0.0.1',0), Requesthandler)
+    thread.start_new_thread(httpd.handle_request, ())
+    
+    req = urllib2.Request('http://localhost:%d' % httpd.server_port)
     conn = None
     try:
         conn = urllib2.urlopen(req)
@@ -392,7 +425,7 @@
     process_engines = []
     logger_queues = []
     
-    SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+    SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
     process_engines.append(engine_process)
     lqueue = mQueue(50)
     logger_queues.append(lqueue)
@@ -402,7 +435,7 @@
     tweet_processes = []
     
     for i in range(options.process_nb - 1):
-        SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
+        SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
         process_engines.append(engine_process)
         lqueue = mQueue(50)
         logger_queues.append(lqueue)
@@ -462,21 +495,13 @@
             cprocess.terminate()
 
     
-    utils.get_logger().debug("Close queues")
-    try:
-        queue.close()
-        for lqueue in logger_queues:
-            lqueue.close()
-    except exception as e:
-        utils.get_logger().error("error when closing queues %s", repr(e))
-        # do nothing
-        
+    utils.get_logger().debug("Close queues")        
     
     if options.process_nb > 1:
         utils.get_logger().debug("Processing leftovers")
         session = session_maker()
         try:
-            process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
+            process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger())
             session.commit()
         finally:
             session.rollback()
@@ -484,11 +509,19 @@
 
     for pengine in process_engines:
         pengine.dispose()
+    
+    try:
+        queue.close()
+        for lqueue in logger_queues:
+            lqueue.close()
+    except Exception as e:
+        utils.get_logger().error("error when closing queues %s", repr(e))
+        # do nothing
 
     return stop_args
 
 
-def main(options, args):
+def main(options):
     
     global conn_str
     
@@ -513,7 +546,8 @@
     Session, engine, metadata = get_sessionmaker(conn_str)
     
     if options.new:
-        check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
+        check_metadata = sqlalchemy.schema.MetaData(bind=engine)
+        check_metadata.reflect()
         if len(check_metadata.sorted_tables) > 0:
             message = "Database %s not empty exiting" % conn_str
             utils.get_logger().error(message)
@@ -528,7 +562,7 @@
     
     stop_args = {}
     try:
-        add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
+        add_process_event(event_type="start", args={'options':options.__dict__, 'args': [], 'command_line': sys.argv}, session_maker=Session)
         stop_args = do_run(options, Session)
     except Exception as e:
         utils.get_logger().exception("Error in main thread")        
@@ -540,7 +574,7 @@
             outfile.close()
         raise
     finally:    
-        add_process_event(type="shutdown", args=stop_args, session_maker=Session)
+        add_process_event(event_type="shutdown", args=stop_args, session_maker=Session)
 
     utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
 
@@ -548,15 +582,15 @@
 
 if __name__ == '__main__':
 
-    (options, args) = get_options()
+    options = get_options()
     
     loggers = set_logging(options)
     
     utils.get_logger().debug("OPTIONS : " + repr(options))
     
     if options.daemon:
+        options.ask_process_leftovers = False
         import daemon
-        import lockfile
         
         hdlr_preserve = []
         for logger in loggers:
@@ -564,7 +598,7 @@
             
         context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) 
         with context:
-            main(options, args)
+            main(options)
     else:
-        main(options, args)
-
+        main(options)
+