script/stream/recorder_tweetstream.py
author Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Wed, 27 Jul 2011 00:04:55 +0200
changeset 242 cdd7d3c0549c
parent 207 621fa6caec0c
child 243 9213a63fa34a
permissions -rw-r--r--
Starting 'parallel_twitter' branch

from getpass import getpass
from iri_tweet import models, utils
from iri_tweet.models import TweetSource, TweetLog
from multiprocessing import Queue, JoinableQueue, Process, Event
from optparse import OptionParser
from sqlalchemy.orm import sessionmaker
from sqlite3 import *
import StringIO
import anyjson
import datetime
import logging
import os
import shutil
import signal
import socket
import sys
import time
import traceback
import tweepy.auth
import tweetstream
socket._fileobject.default_bufsize = 0



#columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
#columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
#just put it in a sqlite3 tqble


class ReconnectingTweetStream(tweetstream.FilterStream):
    """TweetStream class that automatically tries to reconnect if the
    connecting goes down. Reconnecting, and waiting for reconnecting, is
    blocking.

    :param username: See :TweetStream:

    :param password: See :TweetStream:

    :keyword url: See :TweetStream:

    :keyword reconnects: Number of reconnects before a ConnectionError is
        raised. Default is 3

    :error_cb: Optional callable that will be called just before trying to
        reconnect. The callback will be called with a single argument, the
        exception that caused the reconnect attempt. Default is None

    :retry_wait: Time to wait before reconnecting in seconds. Default is 5

    """

    def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs):
        self.max_reconnects = reconnects
        self.retry_wait = retry_wait
        self._reconnects = 0
        self._error_cb = error_cb
        super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs)

    def next(self):
        while True:
            try:
                return super(ReconnectingTweetStream, self).next()
            except tweetstream.ConnectionError, e:
                logging.debug("connection error :" + str(e))
                self._reconnects += 1
                if self._reconnects > self.max_reconnects:
                    raise tweetstream.ConnectionError("Too many retries")

                # Note: error_cb is not called on the last error since we
                # raise a ConnectionError instead
                if  callable(self._error_cb):
                    self._error_cb(e)

                time.sleep(self.retry_wait)
        # Don't listen to auth error, since we can't reasonably reconnect
        # when we get one.



class SourceProcess(Process):
    
    def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
        super(SourceProcess, self).__init__()
        self.session_maker = session_maker
        self.queue = queue
        self.auth = auth
        self.track = track
        self.debug = debug
        self.reconnects = reconnects
        self.token_filename = token_filename
        self.stop_event = stop_event
#        self.stop_event = 
    
    def run(self):
        
        track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
        track_list = [k for k in track_list.split(',')]
                        
        stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
        stream.muststop = lambda: self.stop_event.is_set()
        
        session = self.session_maker()
        
        try:
            for tweet in stream:
                source = TweetSource(original_json=tweet)
                session.add(source)
                session.flush()
                source_id = source.id
                queue.put((source_id, tweet), False)
                #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
                logging.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
                session.commit()
#                if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
#                    print "Stop recording after %d seconds." % (duration)
#                    break
        except:
            session.rollback()
        finally:
            stream.close()
            session.close()
            
        
class TweetProcess(Process):
    
    def __init__(self, session_maker, queue, debug, token_filename, stop_event):
        super(TweetProcess, self).__init__()
        self.session_maker = session_maker
        self.queue = queue
        self.debug = debug
        self.stop_event = stop_event
        self.token_filename = token_filename

    def run(self):
        
        session = self.session_maker()
        try:
            while not self.stop_event.is_set():
                try:
                    source_id, tweet_txt = queue.get(True, 30)
                except:
                    continue
                process_tweet(tweet_txt, source_id, session)
                session.commit()
                self.queue.task_done()
        except:
            session.rollback()
            raise
        finally:
            session.close()
            
        
    def process_tweet(tweet, source_id, session):
        
        try:
            tweet_obj = anyjson.deserialize(tweet)
            screen_name = ""
            if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
                screen_name = tweet_obj['user']['screen_name']
            logging.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
            logging.debug(u"Process_tweet :" + repr(tweet))
            processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, self.token_filename)
            processor.process()
        except Exception, e:
            message = u"Error %e processing tweet %s" % (unicode(e), tweet)
            logging.error(message)
            output = StringIO.StringIO()
            traceback.print_exception(Exception, e, None, None, output)
            tweet_log = TweetLog(status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=output.getvalue())
            output.close()



#def main_source(username, password, track, session, debug, reconnects, token_filename, duration):

    #username = username or raw_input('Twitter username: ')
    #password = password or getpass('Twitter password: ')

#    track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
#    track_list = [k for k in track_list.split(',')]
    
#    if username and password:
#        auth = tweepy.auth.BasicAuthHandler(username, password)        
#    else:
#        consumer_key = models.CONSUMER_KEY
#        consumer_secret = models.CONSUMER_SECRET
#        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
#        auth.set_access_token(*(utils.get_oauth_token(token_filename)))
    
#    if duration >= 0:
#        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration)
    
#    stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects, as_text=True)
#    try:
#        for tweet in stream:
#            source = TweetSource(original_json=tweet)
#            session.add(source)
#            session.flush()            
#            source_id = source.id
#            process_tweet(tweet, source_id, session, debug, token_filename)
#            logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime)))
#            session.commit()
#            if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
#                print "Stop recording after %d seconds." % (duration)
#                break
#    finally:
#        stream.close()
        
def get_options():
    parser = OptionParser()
    parser.add_option("-f", "--file", dest="filename",
                      help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
    parser.add_option("-u", "--user", dest="username",
                      help="Twitter user", metavar="USER", default=None)
    parser.add_option("-w", "--password", dest="password",
                      help="Twitter password", metavar="PASSWORD", default=None)
    parser.add_option("-T", "--track", dest="track",
                      help="Twitter track", metavar="TRACK")
    parser.add_option("-n", "--new", dest="new", action="store_true",
                      help="new database", default=False)
    parser.add_option("-r", "--reconnects", dest="reconnects",
                      help="Reconnects", metavar="RECONNECTS", default=10, type='int')
    parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
                      help="Token file name")
    parser.add_option("-d", "--duration", dest="duration",
                      help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
    parser.add_option("-N", "--consumer", dest="consumer_nb",
                      help="number of consumer", metavar="CONSUMER", default=1, type='int')



    utils.set_logging_options(parser)

    return parser.parse_args()
    

if __name__ == '__main__':
    
    (options, args) = get_options()
    
    utils.set_logging(options)
        
    if options.debug:
        print "OPTIONS : "
        print repr(options)
    
    if options.new and os.path.exists(options.filename):
        i = 1
        basename, extension = os.path.splitext(options.filename)
        new_path = '$s.%d.%s' % (basename, i, extension)
        while i < 1000000 and os.path.exists(new_path):
            i += 1
            new_path = '$s.%d.%s' % (basename, i, extension)
        if i >= 1000000:
            raise Exception("Unable to find new filename for " + options.filename)
        else:
            shutil.move(options.filename, new_path)

    
    engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
    Session = sessionmaker(bind=engine)
    queue = JoinableQueue()
    stop_event = Event()

    if options.username and options.password:
        auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
    else:
        consumer_key = models.CONSUMER_KEY
        consumer_secret = models.CONSUMER_SECRET
        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
        auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))

     
    sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)    
    
    tweet_processes = []
    
    for i in range(options.consumer_nb):
        cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
        tweet_processes.append(cprocess)

    def interupt_handler(signum, frame):
        stop_event.set()
        
    signal.signal(signal.SIGINT, interupt_handler)

    sprocess.start()
    for cprocess in tweet_processes:
        cprocess.start()

    if options.duration >= 0:
        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)

    while not stop_event.is_set():
        if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
            stop_event.set()
            break
        if sprocess.is_alive():
            time.sleep(0.1)
        else:
            break
    
    sprocess.join()
    queue.join()
    for cprocess in tweet_processes:
        cprocess.join()