import argparse
import datetime
import inspect
import json
import logging
import os
import queue
import re
import shutil
import signal
import socket
import sys
import threading
import time
import traceback
import urllib
from http.server import BaseHTTPRequestHandler, HTTPServer
from io import StringIO
from multiprocessing import Event, Process
from multiprocessing import Queue as mQueue
import requests_oauthlib
import sqlalchemy.schema
import twitter
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import scoped_session
import _thread
import iri_tweet.stream
from iri_tweet import models, utils
from iri_tweet.models import ProcessEvent, TweetLog, TweetSource
from iri_tweet.processor import get_processor
# columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
# columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
# just put it in a sqlite3 tqble
DEFAULT_TIMEOUT = 3
class Requesthandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.end_headers()
def log_message(self, format, *args): # @ReservedAssignment
pass
def set_logging(options):
loggers = []
loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing')))
if options.debug >= 2:
loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine')))
# utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
# utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
# utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
return loggers
def set_logging_process(options, queue):
qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
qlogger.propagate = 0
return qlogger
def get_auth(consumer_key, consumer_secret, token_key, token_secret):
return requests_oauthlib.OAuth1(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=token_key, resource_owner_secret=token_secret, signature_type='auth_header')
def add_process_event(event_type, args, session_maker):
session = session_maker()
try:
evt = ProcessEvent(args=None if args is None else json.dumps(args), type=event_type)
session.add(evt)
session.commit()
finally:
session.close()
class BaseProcess(Process):
def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid):
self.parent_pid = parent_pid
self.session_maker = session_maker
self.queue = queue
self.options = options
self.logger_queue = logger_queue
self.stop_event = stop_event
self.twitter_auth = twitter_auth
super(BaseProcess, self).__init__()
#
# from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
#
def parent_is_alive(self):
try:
# try to call Parent
os.kill(self.parent_pid, 0)
except OSError:
# *beeep* oh no! The phone's disconnected!
return False
else:
# *ring* Hi mom!
return True
def __get_process_event_args(self):
return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__}
def run(self):
try:
add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
self.do_run()
finally:
add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
def do_run(self):
raise NotImplementedError()
class SourceProcess(BaseProcess):
def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid):
self.track = options.track
self.timeout = options.timeout
self.stream = None
super(SourceProcess, self).__init__(session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid)
def __source_stream_iter(self):
self.logger.debug("SourceProcess : run ")
self.logger.debug("SourceProcess : get_auth auth with option %s and token %s " %(self.options, self.twitter_auth))
self.auth = get_auth(self.twitter_auth.consumer_key, self.twitter_auth.consumer_secret, self.twitter_auth.token, self.twitter_auth.token_secret)
self.logger.debug("SourceProcess : auth set ")
track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
self.logger.debug("SourceProcess : track list " + track_list)
track_list = [k.strip() for k in track_list.split(',')]
self.logger.debug("SourceProcess : before connecting to stream %s, url : %s, auth : %s" % (repr(track_list), self.options.url, repr(self.auth)))
self.stream = iri_tweet.stream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, timeout=self.timeout, logger=self.logger)
self.logger.debug("SourceProcess : after connecting to stream")
self.stream.muststop = lambda: self.stop_event.is_set()
stream_wrapper = iri_tweet.stream.SafeStreamWrapper(self.stream, logger=self.logger)
session = self.session_maker()
#import pydevd
#pydevd.settrace(suspend=False)
try:
for tweet in stream_wrapper:
if not self.parent_is_alive():
self.stop_event.set()
sys.exit()
self.logger.debug("SourceProcess : tweet " + repr(tweet))
source = TweetSource(original_json=tweet)
self.logger.debug("SourceProcess : source created")
add_retries = 0
while add_retries < 10:
try:
add_retries += 1
session.add(source)
session.flush()
break
except OperationalError as e:
session.rollback()
self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
if add_retries == 10:
raise
source_id = source.id
self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
session.commit()
self.queue.put((source_id, tweet), False)
except Exception as e:
self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
raise
finally:
session.rollback()
session.close()
self.stream.close()
self.stream = None
if not self.stop_event.is_set():
self.stop_event.set()
def do_run(self):
self.logger = set_logging_process(self.options, self.logger_queue)
source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
source_stream_iter_thread.start()
try:
while not self.stop_event.is_set():
self.logger.debug("SourceProcess : In while after start")
self.stop_event.wait(DEFAULT_TIMEOUT)
except KeyboardInterrupt:
self.stop_event.set()
pass
if self.stop_event.is_set() and self.stream:
self.stream.close()
elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
self.stop_event.set()
self.queue.cancel_join_thread()
self.logger_queue.cancel_join_thread()
self.logger.info("SourceProcess : join")
source_stream_iter_thread.join(30)
def process_tweet(tweet, source_id, session, twitter_auth, twitter_query_user, logger):
try:
if not tweet.strip():
return
tweet_obj = json.loads(tweet)
processor_klass = get_processor(tweet_obj)
if not processor_klass:
tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
session.add(tweet_log)
return
processor = processor_klass(json_dict=tweet_obj,
json_txt=tweet,
source_id=source_id,
session=session,
twitter_auth=twitter_auth,
user_query_twitter=twitter_query_user,
logger=logger)
logger.info(processor.log_info())
logger.debug(u"Process_tweet :" + repr(tweet))
processor.process()
except ValueError as e:
message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
output = StringIO()
try:
traceback.print_exc(file=output)
error_stack = output.getvalue()
finally:
output.close()
tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
session.add(tweet_log)
session.commit()
except Exception as e:
message = u"Error %s processing tweet %s" % (repr(e), tweet)
logger.exception(message)
output = StringIO()
try:
traceback.print_exc(file=output)
error_stack = output.getvalue()
finally:
output.close()
session.rollback()
tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
session.add(tweet_log)
session.commit()
class TweetProcess(BaseProcess):
def __init__(self, session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid):
super(TweetProcess, self).__init__(session_maker, queue, options, twitter_auth, stop_event, logger_queue, parent_pid)
self.twitter_query_user = options.twitter_query_user
def do_run(self):
self.logger = set_logging_process(self.options, self.logger_queue)
session = self.session_maker()
try:
while not self.stop_event.is_set() and self.parent_is_alive():
try:
source_id, tweet_txt = self.queue.get(True, 3)
self.logger.debug("Processing source id " + repr(source_id))
except Exception as e:
self.logger.debug('Process tweet exception in loop : ' + repr(e))
continue
process_tweet(tweet_txt, source_id, session, self.twitter_auth, self.twitter_query_user, self.logger)
session.commit()
except KeyboardInterrupt:
self.stop_event.set()
finally:
session.rollback()
session.close()
def get_sessionmaker(conn_str):
engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
Session = scoped_session(Session)
return Session, engine, metadata
def process_leftovers(session, twitter_auth, twitter_query_user, ask_process_leftovers, logger):
sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
sources_count = sources.count()
if sources_count > 10 and ask_process_leftovers:
resp = input("Do you want to process leftovers (Y/n) ? (%d tweet to process)" % sources_count)
if resp and resp.strip().lower() == "n":
return
logger.info("Process leftovers, %d tweets to process" % (sources_count))
for src in sources:
tweet_txt = src.original_json
process_tweet(tweet_txt, src.id, session, twitter_auth, twitter_query_user, logger)
session.commit()
def process_log(logger_queues, stop_event):
while not stop_event.is_set():
for lqueue in logger_queues:
try:
record = lqueue.get_nowait()
logging.getLogger(record.name).handle(record)
except queue.Empty:
continue
except IOError:
continue
time.sleep(0.1)
def get_options():
usage = "usage: %(prog)s [options]"
parser = argparse.ArgumentParser(usage=usage)
parser.add_argument("-f", "--file", dest="conn_str",
help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
parser.add_argument("-T", "--track", dest="track",
help="Twitter track", metavar="TRACK")
parser.add_argument("-k", "--key", dest="consumer_key",
help="Twitter consumer key", metavar="CONSUMER_KEY", required=True)
parser.add_argument("-s", "--secret", dest="consumer_secret",
help="Twitter consumer secret", metavar="CONSUMER_SECRET", required=True)
parser.add_argument("-n", "--new", dest="new", action="store_true",
help="new database", default=False)
parser.add_argument("-D", "--daemon", dest="daemon", action="store_true",
help="launch daemon", default=False)
parser.add_argument("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
help="Token file name")
parser.add_argument("-d", "--duration", dest="duration",
help="Duration of recording in seconds", metavar="DURATION", default= -1, type=int)
parser.add_argument("-N", "--nb-process", dest="process_nb",
help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type=int)
parser.add_argument("--url", dest="url",
help="The twitter url to connect to.", metavar="URL", default=iri_tweet.stream.FilterStream.url)
parser.add_argument("--query-user", dest="twitter_query_user", action="store_true",
help="Query twitter for users", default=False)
parser.add_argument("--timeout", dest="timeout",
help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type=int)
parser.add_argument("--ask-process-leftovers", dest="ask_process_leftovers", action="store_false",
help="ask process leftover", default=True)
utils.set_logging_options(parser)
return parser.parse_args()
def do_run(options, session_maker):
stop_args = {}
access_token_key, access_token_secret = utils.get_oauth_token(consumer_key=options.consumer_key, consumer_secret=options.consumer_secret, token_file_path=options.token_filename)
twitter_auth = twitter.OAuth(access_token_key, access_token_secret, options.consumer_key, options.consumer_secret)
session = session_maker()
try:
process_leftovers(session, twitter_auth, options.twitter_query_user, options.ask_process_leftovers, utils.get_logger())
session.commit()
finally:
session.rollback()
session.close()
if options.process_nb <= 0:
utils.get_logger().debug("Leftovers processed. Exiting.")
return None
queue = mQueue()
stop_event = Event()
# workaround for bug on using urllib2 and multiprocessing
httpd = HTTPServer(('127.0.0.1',0), Requesthandler)
_thread.start_new_thread(httpd.handle_request, ())
req = urllib.request.Request('http://localhost:%d' % httpd.server_port)
conn = None
try:
conn = urllib.request.urlopen(req)
except:
utils.get_logger().debug("could not open localhost")
# donothing
finally:
if conn is not None:
conn.close()
process_engines = []
logger_queues = []
SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
process_engines.append(engine_process)
lqueue = mQueue(50)
logger_queues.append(lqueue)
pid = os.getpid()
sprocess = SourceProcess(SessionProcess, queue, options, twitter_auth, stop_event, lqueue, pid)
tweet_processes = []
for i in range(options.process_nb - 1):
SessionProcess, engine_process, _ = get_sessionmaker(conn_str)
process_engines.append(engine_process)
lqueue = mQueue(50)
logger_queues.append(lqueue)
cprocess = TweetProcess(SessionProcess, queue, options, twitter_auth, stop_event, lqueue, pid)
tweet_processes.append(cprocess)
log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
log_thread.daemon = True
log_thread.start()
sprocess.start()
for cprocess in tweet_processes:
cprocess.start()
add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
if options.duration >= 0:
end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
def interupt_handler(signum, frame):
utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9)))
stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
stop_event.set()
signal.signal(signal.SIGINT , interupt_handler)
signal.signal(signal.SIGHUP , interupt_handler)
signal.signal(signal.SIGALRM, interupt_handler)
signal.signal(signal.SIGTERM, interupt_handler)
while not stop_event.is_set():
if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts:
stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
stop_event.set()
break
if sprocess.is_alive():
utils.get_logger().debug("Source process alive")
time.sleep(1)
else:
stop_args.update({'message': 'Source process killed'})
stop_event.set()
break
utils.get_logger().debug("Joining Source Process")
try:
sprocess.join(10)
except:
utils.get_logger().debug("Pb joining Source Process - terminating")
finally:
sprocess.terminate()
for i, cprocess in enumerate(tweet_processes):
utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
try:
cprocess.join(3)
except:
utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
cprocess.terminate()
utils.get_logger().debug("Close queues")
try:
queue.close()
for lqueue in logger_queues:
lqueue.close()
except Exception as e:
utils.get_logger().error("error when closing queues %s", repr(e))
# do nothing
if options.process_nb > 1:
utils.get_logger().debug("Processing leftovers")
session = session_maker()
try:
process_leftovers(session, twitter_auth, options.twitter_query_user, options.ask_process_leftovers, utils.get_logger())
session.commit()
finally:
session.rollback()
session.close()
for pengine in process_engines:
pengine.dispose()
return stop_args
def main(options):
global conn_str
conn_str = options.conn_str.strip()
if not re.match(r"^\w+://.+", conn_str):
conn_str = 'sqlite:///' + options.conn_str
if conn_str.startswith("sqlite") and options.new:
filepath = conn_str[conn_str.find(":///") + 4:]
if os.path.exists(filepath):
i = 1
basename, extension = os.path.splitext(filepath)
new_path = '%s.%d%s' % (basename, i, extension)
while i < 1000000 and os.path.exists(new_path):
i += 1
new_path = '%s.%d%s' % (basename, i, extension)
if i >= 1000000:
raise Exception("Unable to find new filename for " + filepath)
else:
shutil.move(filepath, new_path)
Session, engine, metadata = get_sessionmaker(conn_str)
if options.new:
check_metadata = sqlalchemy.schema.MetaData(bind=engine)
check_metadata.reflect()
if len(check_metadata.sorted_tables) > 0:
message = "Database %s not empty exiting" % conn_str
utils.get_logger().error(message)
sys.exit(message)
metadata.create_all(engine)
session = Session()
try:
models.add_model_version(session)
finally:
session.close()
stop_args = {}
try:
add_process_event(event_type="start", args={'options':options.__dict__, 'args': [], 'command_line': sys.argv}, session_maker=Session)
stop_args = do_run(options, Session)
except Exception as e:
utils.get_logger().exception("Error in main thread")
outfile = StringIO()
try:
traceback.print_exc(file=outfile)
stop_args = {'error': repr(e), 'message': repr(e), 'stacktrace':outfile.getvalue()}
finally:
outfile.close()
raise
finally:
add_process_event(event_type="shutdown", args=stop_args, session_maker=Session)
utils.get_logger().debug("Done. Exiting. " + repr(stop_args))
if __name__ == '__main__':
options = get_options()
loggers = set_logging(options)
utils.get_logger().debug("OPTIONS : " + repr(options))
if options.daemon:
options.ask_process_leftovers = False
import daemon
hdlr_preserve = []
for logger in loggers:
hdlr_preserve.extend([h.stream for h in logger.handlers])
context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve)
with context:
main(options)
else:
main(options)