diff -r 503f9a7b7d6c -r 6fc6637d8403 script/stream/recorder_stream.py --- a/script/stream/recorder_stream.py Sun Apr 21 21:55:06 2013 +0200 +++ b/script/stream/recorder_stream.py Tue May 07 18:57:54 2013 +0200 @@ -1,14 +1,14 @@ -from getpass import getpass +from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer from iri_tweet import models, utils from iri_tweet.models import TweetSource, TweetLog, ProcessEvent -from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event, - get_logger) -from optparse import OptionParser +from iri_tweet.processor import get_processor +from multiprocessing import Queue as mQueue, Process, Event from sqlalchemy.exc import OperationalError from sqlalchemy.orm import scoped_session import Queue import StringIO import anyjson +import argparse import datetime import inspect import iri_tweet.stream @@ -21,6 +21,7 @@ import socket import sqlalchemy.schema import sys +import thread import threading import time import traceback @@ -35,7 +36,20 @@ columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count'] # just put it in a sqlite3 tqble -DEFAULT_TIMEOUT = 5 +DEFAULT_TIMEOUT = 3 + +class Requesthandler(BaseHTTPRequestHandler): + + def __init__(self, request, client_address, server): + BaseHTTPRequestHandler.__init__(self, request, client_address, server) + + def do_GET(self): + self.send_response(200) + self.end_headers() + + def log_message(self, format, *args): # @ReservedAssignment + pass + def set_logging(options): loggers = [] @@ -55,19 +69,16 @@ return qlogger def get_auth(options, access_token): - if options.username and options.password: - auth = requests.auth.BasicAuthHandler(options.username, options.password) - else: - consumer_key = models.CONSUMER_KEY - consumer_secret = models.CONSUMER_SECRET - auth = requests_oauthlib.OAuth1(access_token[0], access_token[1], consumer_key, consumer_secret, signature_type='auth_header') + consumer_key = options.consumer_key + consumer_secret = options.consumer_secret + auth = requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=access_token[0], resource_owner_secret=access_token[1], signature_type='auth_header') return auth -def add_process_event(type, args, session_maker): +def add_process_event(event_type, args, session_maker): session = session_maker() try: - evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type) + evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=event_type) session.add(evt) session.commit() finally: @@ -83,6 +94,7 @@ self.options = options self.logger_queue = logger_queue self.stop_event = stop_event + self.consumer_token = (options.consumer_key, options.consumer_secret) self.access_token = access_token super(BaseProcess, self).__init__() @@ -122,16 +134,15 @@ def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): self.track = options.track self.token_filename = options.token_filename - self.catchup = options.catchup self.timeout = options.timeout self.stream = None super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) def __source_stream_iter(self): - - self.logger = set_logging_process(self.options, self.logger_queue) + self.logger.debug("SourceProcess : run ") + self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.access_token)) self.auth = get_auth(self.options, self.access_token) self.logger.debug("SourceProcess : auth set ") @@ -140,8 +151,8 @@ track_list = [k.strip() for k in track_list.split(',')] - self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) - self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger) + self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth))) + self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, chunk_size=512, logger=self.logger) self.logger.debug("SourceProcess : after connecting to stream") self.stream.muststop = lambda: self.stop_event.is_set() @@ -149,11 +160,14 @@ session = self.session_maker() + #import pydevd + #pydevd.settrace(suspend=False) + + try: for tweet in stream_wrapper: if not self.parent_is_alive(): self.stop_event.set() - stop_thread.join(5) sys.exit() self.logger.debug("SourceProcess : tweet " + repr(tweet)) source = TweetSource(original_json=tweet) @@ -193,41 +207,52 @@ def do_run(self): - # import pydevd - # pydevd.settrace(suspend=False) + self.logger = set_logging_process(self.options, self.logger_queue) source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread") source_stream_iter_thread.start() - while not self.stop_event.is_set(): - self.logger.debug("SourceProcess : In while after start") - self.stop_event.wait(DEFAULT_TIMEOUT) - if self.stop_event.is_set() and self.stream: - self.stream.close() - elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive: - self.stop_event.set() + try: + while not self.stop_event.is_set(): + self.logger.debug("SourceProcess : In while after start") + self.stop_event.wait(DEFAULT_TIMEOUT) + except KeyboardInterrupt: + self.stop_event.set() + pass + if self.stop_event.is_set() and self.stream: + self.stream.close() + elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive: + self.stop_event.set() + self.logger.info("SourceProcess : join") source_stream_iter_thread.join(30) -def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger): +def process_tweet(tweet, source_id, session, consumer_token, access_token, twitter_query_user, token_filename, logger): try: if not tweet.strip(): return tweet_obj = anyjson.deserialize(tweet) - if 'text' not in tweet_obj: + processor_klass = get_processor(tweet_obj) + if not processor_klass: tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET']) session.add(tweet_log) return - screen_name = "" - if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']: - screen_name = tweet_obj['user']['screen_name'] - logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text'])) - logger.debug(u"Process_tweet :" + repr(tweet)) - processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user) + processor = processor_klass(json_dict=tweet_obj, + json_txt=tweet, + source_id=source_id, + session=session, + consumer_token=consumer_token, + access_token=access_token, + token_filename=token_filename, + user_query_twitter=twitter_query_user, + logger=logger) + logger.info(processor.log_info()) + logger.debug(u"Process_tweet :" + repr(tweet)) processor.process() + except ValueError as e: message = u"Value Error %s processing tweet %s" % (repr(e), tweet) output = StringIO.StringIO() @@ -274,8 +299,10 @@ except Exception as e: self.logger.debug('Process tweet exception in loop : ' + repr(e)) continue - process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger) + process_tweet(tweet_txt, source_id, session, self.consumer_token, self.access_token, self.twitter_query_user, self.options.token_filename, self.logger) session.commit() + except KeyboardInterrupt: + self.stop_event.set() finally: session.rollback() session.close() @@ -287,15 +314,20 @@ return Session, engine, metadata -def process_leftovers(session, access_token, twitter_query_user, logger): +def process_leftovers(session, consumer_token, access_token, twitter_query_user, token_filename, ask_process_leftovers, logger): sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) + sources_count = sources.count() + if sources_count > 10 and ask_process_leftovers: + resp = raw_input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count) + if resp and resp.strip().lower() == "n": + return + logger.info("Process leftovers, %d tweets to process" % (sources_count)) for src in sources: tweet_txt = src.original_json - process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger) + process_tweet(tweet_txt, src.id, session, consumer_token, access_token, twitter_query_user, token_filename, logger) session.commit() - # get tweet source that do not match any message @@ -315,38 +347,36 @@ def get_options(): - usage = "usage: %prog [options]" + usage = "usage: %(prog)s [options]" - parser = OptionParser(usage=usage) + parser = argparse.ArgumentParser(usage=usage) - parser.add_option("-f", "--file", dest="conn_str", - help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db") - parser.add_option("-u", "--user", dest="username", - help="Twitter user", metavar="USER", default=None) - parser.add_option("-w", "--password", dest="password", - help="Twitter password", metavar="PASSWORD", default=None) - parser.add_option("-T", "--track", dest="track", - help="Twitter track", metavar="TRACK") - parser.add_option("-n", "--new", dest="new", action="store_true", - help="new database", default=False) - parser.add_option("-D", "--daemon", dest="daemon", action="store_true", - help="launch daemon", default=False) - parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", - help="Token file name") - parser.add_option("-d", "--duration", dest="duration", - help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int') - parser.add_option("-N", "--nb-process", dest="process_nb", - help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int') - parser.add_option("--url", dest="url", - help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url) - parser.add_option("--query-user", dest="twitter_query_user", action="store_true", - help="Query twitter for users", default=False, metavar="QUERY_USER") - parser.add_option("--catchup", dest="catchup", - help="catchup count for tweets", default=None, metavar="CATCHUP", type='int') - parser.add_option("--timeout", dest="timeout", - help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int') - - + parser.add_argument("-f", "--file", dest="conn_str", + help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db") + parser.add_argument("-T", "--track", dest="track", + help="Twitter track", metavar="TRACK") + parser.add_argument("-k", "--key", dest="consumer_key", + help="Twitter consumer key", metavar="CONSUMER_KEY", required=True) + parser.add_argument("-s", "--secret", dest="consumer_secret", + help="Twitter consumer secret", metavar="CONSUMER_SECRET", required=True) + parser.add_argument("-n", "--new", dest="new", action="store_true", + help="new database", default=False) + parser.add_argument("-D", "--daemon", dest="daemon", action="store_true", + help="launch daemon", default=False) + parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", + help="Token file name") + parser.add_argument("-d", "--duration", dest="duration", + help="Duration of recording in seconds", metavar="DURATION", default= -1, type=int) + parser.add_argument("-N", "--nb-process", dest="process_nb", + help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type=int) + parser.add_argument("--url", dest="url", + help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url) + parser.add_argument("--query-user", dest="twitter_query_user", action="store_true", + help="Query twitter for users", default=False) + parser.add_argument("--timeout", dest="timeout", + help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type=int) + parser.add_argument("--ask-process-leftovers", dest="ask_process_leftovers", action="store_false", + help="ask process leftover", default=True) utils.set_logging_options(parser) @@ -357,14 +387,14 @@ def do_run(options, session_maker): stop_args = {} - - access_token = None - if not options.username or not options.password: - access_token = utils.get_oauth_token(options.token_filename) + + consumer_token = (options.consumer_key, options.consumer_secret) + access_token = utils.get_oauth_token(consumer_key=consumer_token[0], consumer_secret=consumer_token[1], token_file_path=options.token_filename) + session = session_maker() try: - process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger()) + process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger()) session.commit() finally: session.rollback() @@ -378,7 +408,10 @@ stop_event = Event() # workaround for bug on using urllib2 and multiprocessing - req = urllib2.Request('http://localhost') + httpd = HTTPServer(('127.0.0.1',0), Requesthandler) + thread.start_new_thread(httpd.handle_request, ()) + + req = urllib2.Request('http://localhost:%d' % httpd.server_port) conn = None try: conn = urllib2.urlopen(req) @@ -392,7 +425,7 @@ process_engines = [] logger_queues = [] - SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) + SessionProcess, engine_process, _ = get_sessionmaker(conn_str) process_engines.append(engine_process) lqueue = mQueue(50) logger_queues.append(lqueue) @@ -402,7 +435,7 @@ tweet_processes = [] for i in range(options.process_nb - 1): - SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str) + SessionProcess, engine_process, _ = get_sessionmaker(conn_str) process_engines.append(engine_process) lqueue = mQueue(50) logger_queues.append(lqueue) @@ -462,21 +495,13 @@ cprocess.terminate() - utils.get_logger().debug("Close queues") - try: - queue.close() - for lqueue in logger_queues: - lqueue.close() - except exception as e: - utils.get_logger().error("error when closing queues %s", repr(e)) - # do nothing - + utils.get_logger().debug("Close queues") if options.process_nb > 1: utils.get_logger().debug("Processing leftovers") session = session_maker() try: - process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger()) + process_leftovers(session, consumer_token, access_token, options.twitter_query_user, options.token_filename, options.ask_process_leftovers, utils.get_logger()) session.commit() finally: session.rollback() @@ -484,11 +509,19 @@ for pengine in process_engines: pengine.dispose() + + try: + queue.close() + for lqueue in logger_queues: + lqueue.close() + except Exception as e: + utils.get_logger().error("error when closing queues %s", repr(e)) + # do nothing return stop_args -def main(options, args): +def main(options): global conn_str @@ -513,7 +546,8 @@ Session, engine, metadata = get_sessionmaker(conn_str) if options.new: - check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True) + check_metadata = sqlalchemy.schema.MetaData(bind=engine) + check_metadata.reflect() if len(check_metadata.sorted_tables) > 0: message = "Database %s not empty exiting" % conn_str utils.get_logger().error(message) @@ -528,7 +562,7 @@ stop_args = {} try: - add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session) + add_process_event(event_type="start", args={'options':options.__dict__, 'args': [], 'command_line': sys.argv}, session_maker=Session) stop_args = do_run(options, Session) except Exception as e: utils.get_logger().exception("Error in main thread") @@ -540,7 +574,7 @@ outfile.close() raise finally: - add_process_event(type="shutdown", args=stop_args, session_maker=Session) + add_process_event(event_type="shutdown", args=stop_args, session_maker=Session) utils.get_logger().debug("Done. Exiting. " + repr(stop_args)) @@ -548,15 +582,15 @@ if __name__ == '__main__': - (options, args) = get_options() + options = get_options() loggers = set_logging(options) utils.get_logger().debug("OPTIONS : " + repr(options)) if options.daemon: + options.ask_process_leftovers = False import daemon - import lockfile hdlr_preserve = [] for logger in loggers: @@ -564,7 +598,7 @@ context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve) with context: - main(options, args) + main(options) else: - main(options, args) - + main(options) +