script/stream/recorder_tweetstream.py
author Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Tue, 09 Aug 2011 12:41:37 +0200
changeset 245 4c953ca2aa1d
parent 243 9213a63fa34a
child 254 2209e66bb50b
permissions -rw-r--r--
commit .project

from getpass import getpass
from iri_tweet import models, utils
from iri_tweet.models import TweetSource, TweetLog
from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
from optparse import OptionParser
from sqlalchemy.orm import sessionmaker
import StringIO
import logging
import anyjson
import datetime
import os
import shutil
import signal
import socket
import sys
import time
import traceback
import tweepy.auth
import tweetstream
from iri_tweet.utils import logger
from sqlalchemy.exc import OperationalError
socket._fileobject.default_bufsize = 0



#columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
#columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
#just put it in a sqlite3 tqble


class ReconnectingTweetStream(tweetstream.FilterStream):
    """TweetStream class that automatically tries to reconnect if the
    connecting goes down. Reconnecting, and waiting for reconnecting, is
    blocking.

    :param username: See :TweetStream:

    :param password: See :TweetStream:

    :keyword url: See :TweetStream:

    :keyword reconnects: Number of reconnects before a ConnectionError is
        raised. Default is 3

    :error_cb: Optional callable that will be called just before trying to
        reconnect. The callback will be called with a single argument, the
        exception that caused the reconnect attempt. Default is None

    :retry_wait: Time to wait before reconnecting in seconds. Default is 5

    """

    def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs):
        self.max_reconnects = reconnects
        self.retry_wait = retry_wait
        self._reconnects = 0
        self._error_cb = error_cb
        super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs)

    def next(self):
        while True:
            try:
                return super(ReconnectingTweetStream, self).next()
            except tweetstream.ConnectionError, e:
                logging.debug("connection error :" + str(e))
                self._reconnects += 1
                if self._reconnects > self.max_reconnects:
                    raise tweetstream.ConnectionError("Too many retries")

                # Note: error_cb is not called on the last error since we
                # raise a ConnectionError instead
                if  callable(self._error_cb):
                    self._error_cb(e)

                time.sleep(self.retry_wait)
        # Don't listen to auth error, since we can't reasonably reconnect
        # when we get one.



class SourceProcess(Process):
    
    def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
        self.session_maker = session_maker
        self.queue = queue
        self.auth = auth
        self.track = track
        self.debug = debug
        self.reconnects = reconnects
        self.token_filename = token_filename
        self.stop_event = stop_event
        super(SourceProcess, self).__init__()
#        self.stop_event = 
    
    def run(self):
        
        get_logger().debug("SourceProcess : run")
        track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
        track_list = [k for k in track_list.split(',')]

        get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))                        
        stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
        get_logger().debug("SourceProcess : after connecting to stream")
        stream.muststop = lambda: self.stop_event.is_set()
        
        session = self.session_maker()
        
        try:
            for tweet in stream:
                get_logger().debug("tweet " + repr(tweet))
                source = TweetSource(original_json=tweet)
                get_logger().debug("source created")
                add_retries = 0
                while add_retries < 10:
                    try:
                        add_retries += 1
                        session.add(source)
                        session.flush()
                        break
                    except OperationalError as e:
                        session.rollback()
                        get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
                        if add_retries==10:
                            raise e
                     
                source_id = source.id
                get_logger().debug("before queue + source id " + repr(source_id))
                self.queue.put((source_id, tweet), False)
                #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
                get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
                session.commit()
#                if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
#                    print "Stop recording after %d seconds." % (duration)
#                    break
        except Exception as e:
            get_logger().error("Error when processing tweet " + repr(e))
        finally:
            session.rollback()
            stream.close()
            session.close()
            self.queue.close()
            self.stop_event.set()


def process_tweet(tweet, source_id, session, token_filename):
    try:
        tweet_obj = anyjson.deserialize(tweet)
        screen_name = ""
        if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
            screen_name = tweet_obj['user']['screen_name']
        get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
        get_logger().debug(u"Process_tweet :" + repr(tweet))
        processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename)
        processor.process()
    except Exception as e:
        message = u"Error %s processing tweet %s" % (repr(e), tweet)
        get_logger().error(message)
        output = StringIO.StringIO()
        traceback.print_exception(Exception, e, None, None, output)
        error_stack = output.getvalue()
        output.close()
        session.rollback()
        tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
        session.add(tweet_log)
        session.commit()

    
        
class TweetProcess(Process):
    
    def __init__(self, session_maker, queue, debug, token_filename, stop_event):
        self.session_maker = session_maker
        self.queue = queue
        self.debug = debug
        self.stop_event = stop_event
        self.token_filename = token_filename
        super(TweetProcess, self).__init__()


    def run(self):
        
        session = self.session_maker()
        try:
            while not self.stop_event.is_set():
                try:
                    source_id, tweet_txt = queue.get(True, 10)
                    get_logger().debug("Processing source id " + repr(source_id))
                except Exception as e:
                    get_logger().debug('Process tweet exception in loop : ' + repr(e))
                    continue
                process_tweet(tweet_txt, source_id, session, self.token_filename)
                session.commit()
        except:
            raise
        finally:
            session.rollback()
            self.stop_event.set()
            session.close()
            
def process_leftovers(session, token_filename):
    
    sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
    
    for src in sources:
        tweet_txt = src.original_json
        process_tweet(tweet_txt, src.id, session, token_filename)

        
    
    #get tweet source that do not match any message
    #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;

        
def get_options():
    parser = OptionParser()
    parser.add_option("-f", "--file", dest="filename",
                      help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
    parser.add_option("-u", "--user", dest="username",
                      help="Twitter user", metavar="USER", default=None)
    parser.add_option("-w", "--password", dest="password",
                      help="Twitter password", metavar="PASSWORD", default=None)
    parser.add_option("-T", "--track", dest="track",
                      help="Twitter track", metavar="TRACK")
    parser.add_option("-n", "--new", dest="new", action="store_true",
                      help="new database", default=False)
    parser.add_option("-r", "--reconnects", dest="reconnects",
                      help="Reconnects", metavar="RECONNECTS", default=10, type='int')
    parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
                      help="Token file name")
    parser.add_option("-d", "--duration", dest="duration",
                      help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
    parser.add_option("-N", "--consumer", dest="consumer_nb",
                      help="number of consumer", metavar="CONSUMER", default=1, type='int')



    utils.set_logging_options(parser)

    return parser.parse_args()
    

if __name__ == '__main__':
    
    (options, args) = get_options()
    
    utils.set_logging(options, get_logger())
        
    if options.debug:
        print "OPTIONS : "
        print repr(options)
    
    if options.new and os.path.exists(options.filename):
        i = 1
        basename, extension = os.path.splitext(options.filename)
        new_path = '%s.%d%s' % (basename, i, extension)
        while i < 1000000 and os.path.exists(new_path):
            i += 1
            new_path = '%s.%d%s' % (basename, i, extension)
        if i >= 1000000:
            raise Exception("Unable to find new filename for " + options.filename)
        else:
            shutil.move(options.filename, new_path)

    
    queue = JoinableQueue()
    stop_event = Event()

    if options.username and options.password:
        auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
    else:
        consumer_key = models.CONSUMER_KEY
        consumer_secret = models.CONSUMER_SECRET
        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
        auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))


    engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
    Session = sessionmaker(bind=engine)
    
    session = Session()
    process_leftovers(session, options.token_filename)
    session.commit()
    session.close()
         
    sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)    
    
    tweet_processes = []
    
    for i in range(options.consumer_nb):
        engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
        Session = sessionmaker(bind=engine)
        cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
        tweet_processes.append(cprocess)

    def interupt_handler(signum, frame):
        stop_event.set()
        
    signal.signal(signal.SIGINT, interupt_handler)

    sprocess.start()
    for cprocess in tweet_processes:
        cprocess.start()

    if options.duration >= 0:
        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)

    while not stop_event.is_set():
        if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
            stop_event.set()
            break
        if sprocess.is_alive():
            time.sleep(0.1)
        else:
            break
    
    get_logger().debug("Joining Source Process")
    sprocess.join()
    get_logger().debug("Joining Queue")
    #queue.join()
    for i,cprocess in enumerate(tweet_processes):
        get_logger().debug("Joining consumer process Nb %d" % (i+1))
        cprocess.join()
    
    get_logger().debug("Processing leftovers")
    session = Session()
    process_leftovers(session, options.token_filename)
    session.commit()
    session.close()

    get_logger().debug("Done. Exiting.")