from getpass import getpass
from iri_tweet import models, utils
from iri_tweet.models import TweetSource, TweetLog, ProcessEvent
from multiprocessing import (Queue as mQueue, JoinableQueue, Process, Event,
get_logger)
from optparse import OptionParser
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import scoped_session
import Queue
import StringIO
import anyjson
import datetime
import inspect
import logging
import os
import re
import shutil
import signal
import socket
import sqlalchemy.schema
import sys
import threading
import time
import traceback
import tweepy.auth
import tweetstream
import urllib2
socket._fileobject.default_bufsize = 0
#columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
#columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
#just put it in a sqlite3 tqble
def set_logging(options):
loggers = []
loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
loggers.append(utils.set_logging(options, logging.getLogger('multiprocessing')))
if options.debug >= 2:
loggers.append(utils.set_logging(options, logging.getLogger('sqlalchemy.engine')))
#utils.set_logging(options, logging.getLogger('sqlalchemy.dialects'))
#utils.set_logging(options, logging.getLogger('sqlalchemy.pool'))
#utils.set_logging(options, logging.getLogger('sqlalchemy.orm'))
return loggers
def set_logging_process(options, queue):
qlogger = utils.set_logging(options, logging.getLogger('iri.tweet.p'), queue)
qlogger.propagate = 0
return qlogger
def get_auth(options, access_token):
if options.username and options.password:
auth = tweepy.auth.BasicAuthHandler(options.username, options.password)
else:
consumer_key = models.CONSUMER_KEY
consumer_secret = models.CONSUMER_SECRET
auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
auth.set_access_token(*access_token)
return auth
class ReconnectingTweetStream(tweetstream.FilterStream):
"""TweetStream class that automatically tries to reconnect if the
connecting goes down. Reconnecting, and waiting for reconnecting, is
blocking.
:param username: See :TweetStream:
:param password: See :TweetStream:
:keyword url: See :TweetStream:
:keyword reconnects: Number of reconnects before a ConnectionError is
raised. Default is 3
:error_cb: Optional callable that will be called just before trying to
reconnect. The callback will be called with a single argument, the
exception that caused the reconnect attempt. Default is None
:retry_wait: Time to wait before reconnecting in seconds. Default is 5
"""
def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs):
self.max_reconnects = reconnects
self.retry_wait = retry_wait
self._reconnects = 0
self._error_cb = error_cb
super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs)
def next(self):
while True:
try:
utils.get_logger().debug("return super.next")
return super(ReconnectingTweetStream, self).next()
except tweetstream.ConnectionError, e:
utils.get_logger().debug("connection error :" + str(e))
self._reconnects += 1
if self._reconnects > self.max_reconnects:
raise tweetstream.ConnectionError("Too many retries")
# Note: error_cb is not called on the last error since we
# raise a ConnectionError instead
if callable(self._error_cb):
self._error_cb(e)
time.sleep(self.retry_wait)
# Don't listen to auth error, since we can't reasonably reconnect
# when we get one.
def add_process_event(type, args, session_maker):
session = session_maker()
try:
evt = ProcessEvent(args=None if args is None else anyjson.serialize(args), type=type)
session.add(evt)
session.commit()
finally:
session.close()
class BaseProcess(Process):
def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
self.parent_pid = parent_pid
self.session_maker = session_maker
self.queue = queue
self.options = options
self.logger_queue = logger_queue
self.stop_event = stop_event
self.access_token = access_token
super(BaseProcess, self).__init__()
#
# from http://stackoverflow.com/questions/2542610/python-daemon-doesnt-kill-its-kids
#
def parent_is_alive(self):
try:
# try to call Parent
os.kill(self.parent_pid, 0)
except OSError:
# *beeep* oh no! The phone's disconnected!
return False
else:
# *ring* Hi mom!
return True
def __get_process_event_args(self):
return {'name':self.name, 'pid':self.pid, 'parent_pid':self.parent_pid, 'options':self.options.__dict__, 'access_token':self.access_token}
def run(self):
try:
add_process_event("start_worker", self.__get_process_event_args(), self.session_maker)
self.do_run()
finally:
add_process_event("stop_worker", self.__get_process_event_args(), self.session_maker)
def do_run(self):
raise NotImplementedError()
class SourceProcess(BaseProcess):
def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
self.track = options.track
self.reconnects = options.reconnects
self.token_filename = options.token_filename
super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
def do_run(self):
#import pydevd
#pydevd.settrace(suspend=False)
self.logger = set_logging_process(self.options, self.logger_queue)
self.auth = get_auth(self.options, self.access_token)
self.logger.debug("SourceProcess : run")
track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
track_list = [k for k in track_list.split(',')]
self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))
stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True, url=self.options.url)
self.logger.debug("SourceProcess : after connecting to stream")
stream.muststop = lambda: self.stop_event.is_set()
session = self.session_maker()
try:
for tweet in stream:
if not self.parent_is_alive():
sys.exit()
self.logger.debug("SourceProcess : tweet " + repr(tweet))
source = TweetSource(original_json=tweet)
self.logger.debug("SourceProcess : source created")
add_retries = 0
while add_retries < 10:
try:
add_retries += 1
session.add(source)
session.flush()
break
except OperationalError as e:
session.rollback()
self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
if add_retries == 10:
raise e
source_id = source.id
self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
session.commit()
self.queue.put((source_id, tweet), False)
except Exception as e:
self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
finally:
session.rollback()
stream.close()
session.close()
self.queue.close()
self.stop_event.set()
def process_tweet(tweet, source_id, session, access_token, logger):
try:
tweet_obj = anyjson.deserialize(tweet)
if 'text' not in tweet_obj:
tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
session.add(tweet_log)
return
screen_name = ""
if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
screen_name = tweet_obj['user']['screen_name']
logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
logger.debug(u"Process_tweet :" + repr(tweet))
processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None)
processor.process()
except Exception as e:
message = u"Error %s processing tweet %s" % (repr(e), tweet)
logger.exception(message)
output = StringIO.StringIO()
try:
traceback.print_exc(file=output)
error_stack = output.getvalue()
finally:
output.close()
session.rollback()
tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
session.add(tweet_log)
session.commit()
class TweetProcess(BaseProcess):
def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
super(TweetProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
def do_run(self):
self.logger = set_logging_process(self.options, self.logger_queue)
session = self.session_maker()
try:
while not self.stop_event.is_set() and self.parent_is_alive():
try:
source_id, tweet_txt = self.queue.get(True, 3)
self.logger.debug("Processing source id " + repr(source_id))
except Exception as e:
self.logger.debug('Process tweet exception in loop : ' + repr(e))
continue
process_tweet(tweet_txt, source_id, session, self.access_token, self.logger)
session.commit()
finally:
session.rollback()
self.stop_event.set()
session.close()
def get_sessionmaker(conn_str):
engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
Session = scoped_session(Session)
return Session, engine, metadata
def process_leftovers(session, access_token, logger):
sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
for src in sources:
tweet_txt = src.original_json
process_tweet(tweet_txt, src.id, session, access_token, logger)
session.commit()
#get tweet source that do not match any message
#select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
def process_log(logger_queues, stop_event):
while not stop_event.is_set():
for lqueue in logger_queues:
try:
record = lqueue.get_nowait()
logging.getLogger(record.name).handle(record)
except Queue.Empty:
continue
except IOError:
continue
time.sleep(0.1)
def get_options():
usage = "usage: %prog [options]"
parser = OptionParser(usage=usage)
parser.add_option("-f", "--file", dest="conn_str",
help="write tweet to DATABASE. This is a connection string", metavar="CONNECTION_STR", default="enmi2010_twitter.db")
parser.add_option("-u", "--user", dest="username",
help="Twitter user", metavar="USER", default=None)
parser.add_option("-w", "--password", dest="password",
help="Twitter password", metavar="PASSWORD", default=None)
parser.add_option("-T", "--track", dest="track",
help="Twitter track", metavar="TRACK")
parser.add_option("-n", "--new", dest="new", action="store_true",
help="new database", default=False)
parser.add_option("-D", "--daemon", dest="daemon", action="store_true",
help="launch daemon", default=False)
parser.add_option("-r", "--reconnects", dest="reconnects",
help="Reconnects", metavar="RECONNECTS", default=10, type='int')
parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
help="Token file name")
parser.add_option("-d", "--duration", dest="duration",
help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
parser.add_option("-N", "--nb-process", dest="process_nb",
help="number of process.\nIf 0, only the lefovers of the database are processed.\nIf 1, no postprocessing is done on the tweets.", metavar="PROCESS_NB", default=2, type='int')
parser.add_option("--url", dest="url",
help="The twitter url to connect to.", metavar="URL", default=tweetstream.FilterStream.url)
utils.set_logging_options(parser)
return parser.parse_args()
def do_run(options, session_maker):
stop_args = {}
access_token = None
if not options.username or not options.password:
access_token = utils.get_oauth_token(options.token_filename)
session = session_maker()
try:
process_leftovers(session, access_token, utils.get_logger())
session.commit()
finally:
session.rollback()
session.close()
if options.process_nb <= 0:
utils.get_logger().debug("Leftovers processed. Exiting.")
return None
queue = mQueue()
stop_event = Event()
#workaround for bug on using urllib2 and multiprocessing
req = urllib2.Request('http://localhost')
conn = None
try:
conn = urllib2.urlopen(req)
except:
utils.get_logger().debug("could not open localhost")
#donothing
finally:
if conn is not None:
conn.close()
process_engines = []
logger_queues = []
SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
process_engines.append(engine_process)
lqueue = mQueue(1)
logger_queues.append(lqueue)
pid = os.getpid()
sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
tweet_processes = []
for i in range(options.process_nb - 1):
SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
process_engines.append(engine_process)
lqueue = mQueue(1)
logger_queues.append(lqueue)
cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
tweet_processes.append(cprocess)
def interupt_handler(signum, frame):
utils.get_logger().debug("shutdown asked " + repr(signum) + " " + repr(inspect.getframeinfo(frame, 9)))
stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
stop_event.set()
signal.signal(signal.SIGINT , interupt_handler)
signal.signal(signal.SIGHUP , interupt_handler)
signal.signal(signal.SIGALRM, interupt_handler)
signal.signal(signal.SIGTERM, interupt_handler)
log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
log_thread.daemon = True
log_thread.start()
sprocess.start()
for cprocess in tweet_processes:
cprocess.start()
add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
if options.duration >= 0:
end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
while not stop_event.is_set():
if options.duration >= 0 and datetime.datetime.utcnow() >= end_ts:
stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
stop_event.set()
break
if sprocess.is_alive():
time.sleep(1)
else:
stop_args.update({'message': 'Source process killed'})
stop_event.set()
break
utils.get_logger().debug("Joining Source Process")
try:
sprocess.join(10)
except:
utils.get_logger().debug("Pb joining Source Process - terminating")
sprocess.terminate()
for i, cprocess in enumerate(tweet_processes):
utils.get_logger().debug("Joining consumer process Nb %d" % (i + 1))
try:
cprocess.join(3)
except:
utils.get_logger().debug("Pb joining consumer process Nb %d - terminating" % (i + 1))
cprocess.terminate()
utils.get_logger().debug("Close queues")
try:
queue.close()
for lqueue in logger_queues:
lqueue.close()
except exception as e:
utils.get_logger().error("error when closing queues %s", repr(e))
#do nothing
if options.process_nb > 1:
utils.get_logger().debug("Processing leftovers")
session = session_maker()
try:
process_leftovers(session, access_token, utils.get_logger())
session.commit()
finally:
session.rollback()
session.close()
for pengine in process_engines:
pengine.dispose()
return stop_args
def main(options, args):
global conn_str
conn_str = options.conn_str.strip()
if not re.match("^\w+://.+", conn_str):
conn_str = 'sqlite:///' + options.conn_str
if conn_str.startswith("sqlite") and options.new:
filepath = conn_str[conn_str.find(":///") + 4:]
if os.path.exists(filepath):
i = 1
basename, extension = os.path.splitext(filepath)
new_path = '%s.%d%s' % (basename, i, extension)
while i < 1000000 and os.path.exists(new_path):
i += 1
new_path = '%s.%d%s' % (basename, i, extension)
if i >= 1000000:
raise Exception("Unable to find new filename for " + filepath)
else:
shutil.move(filepath, new_path)
Session, engine, metadata = get_sessionmaker(conn_str)
if options.new:
check_metadata = sqlalchemy.schema.MetaData(bind=engine, reflect=True)
if len(check_metadata.sorted_tables) > 0:
message = "Database %s not empty exiting" % conn_str
utils.get_logger().error(message)
sys.exit(message)
metadata.create_all(engine)
session = Session()
try:
models.add_model_version(session)
finally:
session.close()
stop_args = {}
try:
add_process_event(type="start", args={'options':options.__dict__, 'args': args, 'command_line': sys.argv}, session_maker=Session)
stop_args = do_run(options, Session)
except Exception as e:
utils.get_logger().exception("Error in main thread")
outfile = StringIO.StringIO()
try:
traceback.print_exc(file=outfile)
stop_args = {'error': repr(e), 'message': getattr(e, 'message', ''), 'stacktrace':outfile.getvalue()}
finally:
outfile.close()
raise
finally:
add_process_event(type="shutdown", args=stop_args, session_maker=Session)
utils.get_logger().debug("Done. Exiting.")
if __name__ == '__main__':
(options, args) = get_options()
loggers = set_logging(options)
utils.get_logger().debug("OPTIONS : " + repr(options))
if options.daemon:
import daemon
import lockfile
hdlr_preserve = []
for logger in loggers:
hdlr_preserve.extend([h.stream for h in logger.handlers])
context = daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=hdlr_preserve)
with context:
main(options, args)
else:
main(options, args)