script/stream/recorder_tweetstream.py
changeset 242 cdd7d3c0549c
parent 207 621fa6caec0c
child 243 9213a63fa34a
equal deleted inserted replaced
240:ee6305b4a7dc 242:cdd7d3c0549c
     1 from getpass import getpass
     1 from getpass import getpass
     2 from iri_tweet import models, utils
     2 from iri_tweet import models, utils
       
     3 from iri_tweet.models import TweetSource, TweetLog
       
     4 from multiprocessing import Queue, JoinableQueue, Process, Event
     3 from optparse import OptionParser
     5 from optparse import OptionParser
     4 from sqlalchemy.orm import sessionmaker
     6 from sqlalchemy.orm import sessionmaker
     5 from sqlite3 import *
     7 from sqlite3 import *
       
     8 import StringIO
       
     9 import anyjson
     6 import datetime
    10 import datetime
     7 import logging
    11 import logging
     8 import os
    12 import os
       
    13 import shutil
       
    14 import signal
     9 import socket
    15 import socket
    10 import sys
    16 import sys
    11 import time
    17 import time
       
    18 import traceback
       
    19 import tweepy.auth
    12 import tweetstream
    20 import tweetstream
    13 import tweepy.auth
       
    14 socket._fileobject.default_bufsize = 0
    21 socket._fileobject.default_bufsize = 0
    15 
    22 
    16 
    23 
    17 
    24 
    18 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
    25 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
    42 
    49 
    43     :retry_wait: Time to wait before reconnecting in seconds. Default is 5
    50     :retry_wait: Time to wait before reconnecting in seconds. Default is 5
    44 
    51 
    45     """
    52     """
    46 
    53 
    47     def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, **kwargs):
    54     def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs):
    48         self.max_reconnects = reconnects
    55         self.max_reconnects = reconnects
    49         self.retry_wait = retry_wait
    56         self.retry_wait = retry_wait
    50         self._reconnects = 0
    57         self._reconnects = 0
    51         self._error_cb = error_cb
    58         self._error_cb = error_cb
    52         super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, **kwargs)
    59         super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs)
    53 
    60 
    54     def next(self):
    61     def next(self):
    55         while True:
    62         while True:
    56             try:
    63             try:
    57                 return super(ReconnectingTweetStream, self).next()
    64                 return super(ReconnectingTweetStream, self).next()
    70         # Don't listen to auth error, since we can't reasonably reconnect
    77         # Don't listen to auth error, since we can't reasonably reconnect
    71         # when we get one.
    78         # when we get one.
    72 
    79 
    73 
    80 
    74 
    81 
    75 def process_tweet(tweet, session, debug, token_filename):
    82 class SourceProcess(Process):
    76     screen_name = ""
    83     
    77     if 'user' in tweet and 'screen_name' in tweet['user']:
    84     def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
    78         screen_name = tweet['user']['screen_name']
    85         super(SourceProcess, self).__init__()
    79     logging.info("Process_tweet from %s : %s" % (screen_name, tweet['text']))
    86         self.session_maker = session_maker
    80     logging.debug("Process_tweet :" + repr(tweet))
    87         self.queue = queue
    81     processor = utils.TwitterProcessor(tweet, None, session, token_filename)
    88         self.auth = auth
    82     processor.process()
    89         self.track = track
    83 
    90         self.debug = debug
    84 def main(username, password, track, session, debug, reconnects, token_filename, duration):
    91         self.reconnects = reconnects
       
    92         self.token_filename = token_filename
       
    93         self.stop_event = stop_event
       
    94 #        self.stop_event = 
       
    95     
       
    96     def run(self):
       
    97         
       
    98         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
       
    99         track_list = [k for k in track_list.split(',')]
       
   100                         
       
   101         stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
       
   102         stream.muststop = lambda: self.stop_event.is_set()
       
   103         
       
   104         session = self.session_maker()
       
   105         
       
   106         try:
       
   107             for tweet in stream:
       
   108                 source = TweetSource(original_json=tweet)
       
   109                 session.add(source)
       
   110                 session.flush()
       
   111                 source_id = source.id
       
   112                 queue.put((source_id, tweet), False)
       
   113                 #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
       
   114                 logging.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
       
   115                 session.commit()
       
   116 #                if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
       
   117 #                    print "Stop recording after %d seconds." % (duration)
       
   118 #                    break
       
   119         except:
       
   120             session.rollback()
       
   121         finally:
       
   122             stream.close()
       
   123             session.close()
       
   124             
       
   125         
       
   126 class TweetProcess(Process):
       
   127     
       
   128     def __init__(self, session_maker, queue, debug, token_filename, stop_event):
       
   129         super(TweetProcess, self).__init__()
       
   130         self.session_maker = session_maker
       
   131         self.queue = queue
       
   132         self.debug = debug
       
   133         self.stop_event = stop_event
       
   134         self.token_filename = token_filename
       
   135 
       
   136     def run(self):
       
   137         
       
   138         session = self.session_maker()
       
   139         try:
       
   140             while not self.stop_event.is_set():
       
   141                 try:
       
   142                     source_id, tweet_txt = queue.get(True, 30)
       
   143                 except:
       
   144                     continue
       
   145                 process_tweet(tweet_txt, source_id, session)
       
   146                 session.commit()
       
   147                 self.queue.task_done()
       
   148         except:
       
   149             session.rollback()
       
   150             raise
       
   151         finally:
       
   152             session.close()
       
   153             
       
   154         
       
   155     def process_tweet(tweet, source_id, session):
       
   156         
       
   157         try:
       
   158             tweet_obj = anyjson.deserialize(tweet)
       
   159             screen_name = ""
       
   160             if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
       
   161                 screen_name = tweet_obj['user']['screen_name']
       
   162             logging.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
       
   163             logging.debug(u"Process_tweet :" + repr(tweet))
       
   164             processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, self.token_filename)
       
   165             processor.process()
       
   166         except Exception, e:
       
   167             message = u"Error %e processing tweet %s" % (unicode(e), tweet)
       
   168             logging.error(message)
       
   169             output = StringIO.StringIO()
       
   170             traceback.print_exception(Exception, e, None, None, output)
       
   171             tweet_log = TweetLog(status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=output.getvalue())
       
   172             output.close()
       
   173 
       
   174 
       
   175 
       
   176 #def main_source(username, password, track, session, debug, reconnects, token_filename, duration):
    85 
   177 
    86     #username = username or raw_input('Twitter username: ')
   178     #username = username or raw_input('Twitter username: ')
    87     #password = password or getpass('Twitter password: ')
   179     #password = password or getpass('Twitter password: ')
    88 
   180 
    89     track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
   181 #    track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
    90     track_list = [k for k in track_list.split(',')]
   182 #    track_list = [k for k in track_list.split(',')]
    91     
   183     
    92     if username and password:
   184 #    if username and password:
    93         auth = tweepy.auth.BasicAuthHandler(username, password)        
   185 #        auth = tweepy.auth.BasicAuthHandler(username, password)        
    94     else:
   186 #    else:
    95         consumer_key = models.CONSUMER_KEY
   187 #        consumer_key = models.CONSUMER_KEY
    96         consumer_secret = models.CONSUMER_SECRET
   188 #        consumer_secret = models.CONSUMER_SECRET
    97         auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
   189 #        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
    98         auth.set_access_token(*(utils.get_oauth_token(token_filename)))
   190 #        auth.set_access_token(*(utils.get_oauth_token(token_filename)))
    99     
   191     
   100     if duration >= 0:
   192 #    if duration >= 0:
   101         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration)
   193 #        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration)
   102     
   194     
   103     stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects)
   195 #    stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects, as_text=True)
   104     try:
   196 #    try:
   105         for tweet in stream:            
   197 #        for tweet in stream:
   106             if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
   198 #            source = TweetSource(original_json=tweet)
   107                 print "Stop recording after %d seconds." % (duration)
   199 #            session.add(source)
   108                 break
   200 #            session.flush()            
   109             process_tweet(tweet, session, debug, token_filename)
   201 #            source_id = source.id
   110             logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime)))
   202 #            process_tweet(tweet, source_id, session, debug, token_filename)
   111             session.commit()
   203 #            logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime)))
   112     finally:
   204 #            session.commit()
   113         stream.close()
   205 #            if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
       
   206 #                print "Stop recording after %d seconds." % (duration)
       
   207 #                break
       
   208 #    finally:
       
   209 #        stream.close()
   114         
   210         
   115 def get_options():
   211 def get_options():
   116     parser = OptionParser()
   212     parser = OptionParser()
   117     parser.add_option("-f", "--file", dest="filename",
   213     parser.add_option("-f", "--file", dest="filename",
   118                       help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
   214                       help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
   128                       help="Reconnects", metavar="RECONNECTS", default=10, type='int')
   224                       help="Reconnects", metavar="RECONNECTS", default=10, type='int')
   129     parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
   225     parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
   130                       help="Token file name")
   226                       help="Token file name")
   131     parser.add_option("-d", "--duration", dest="duration",
   227     parser.add_option("-d", "--duration", dest="duration",
   132                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
   228                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
       
   229     parser.add_option("-N", "--consumer", dest="consumer_nb",
       
   230                       help="number of consumer", metavar="CONSUMER", default=1, type='int')
       
   231 
   133 
   232 
   134 
   233 
   135     utils.set_logging_options(parser)
   234     utils.set_logging_options(parser)
   136 
   235 
   137     return parser.parse_args()
   236     return parser.parse_args()
   138     
   237     
   139 
   238 
   140 if __name__ == '__main__':
   239 if __name__ == '__main__':
   141     
   240     
   142 
       
   143     (options, args) = get_options()
   241     (options, args) = get_options()
   144     
   242     
   145     utils.set_logging(options)
   243     utils.set_logging(options)
   146         
   244         
   147     if options.debug:
   245     if options.debug:
   148         print "OPTIONS : "
   246         print "OPTIONS : "
   149         print repr(options)
   247         print repr(options)
   150     
   248     
   151     if options.new and os.path.exists(options.filename):
   249     if options.new and os.path.exists(options.filename):
   152         os.remove(options.filename)
   250         i = 1
   153     
   251         basename, extension = os.path.splitext(options.filename)
   154     engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug>=2))
   252         new_path = '$s.%d.%s' % (basename, i, extension)
       
   253         while i < 1000000 and os.path.exists(new_path):
       
   254             i += 1
       
   255             new_path = '$s.%d.%s' % (basename, i, extension)
       
   256         if i >= 1000000:
       
   257             raise Exception("Unable to find new filename for " + options.filename)
       
   258         else:
       
   259             shutil.move(options.filename, new_path)
       
   260 
       
   261     
       
   262     engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
   155     Session = sessionmaker(bind=engine)
   263     Session = sessionmaker(bind=engine)
   156     session = Session()
   264     queue = JoinableQueue()
   157 
   265     stop_event = Event()
   158     try:
   266 
   159         try:
   267     if options.username and options.password:
   160             main(options.username, options.password, options.track, session, options.debug, options.reconnects, options.token_filename, options.duration)
   268         auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
   161         except KeyboardInterrupt:
   269     else:
   162             print '\nGoodbye!'
   270         consumer_key = models.CONSUMER_KEY
   163         session.commit()
   271         consumer_secret = models.CONSUMER_SECRET
   164     finally:
   272         auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
   165         session.close()
   273         auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))
       
   274 
       
   275      
       
   276     sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)    
       
   277     
       
   278     tweet_processes = []
       
   279     
       
   280     for i in range(options.consumer_nb):
       
   281         cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
       
   282         tweet_processes.append(cprocess)
       
   283 
       
   284     def interupt_handler(signum, frame):
       
   285         stop_event.set()
       
   286         
       
   287     signal.signal(signal.SIGINT, interupt_handler)
       
   288 
       
   289     sprocess.start()
       
   290     for cprocess in tweet_processes:
       
   291         cprocess.start()
       
   292 
       
   293     if options.duration >= 0:
       
   294         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
       
   295 
       
   296     while not stop_event.is_set():
       
   297         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
       
   298             stop_event.set()
       
   299             break
       
   300         if sprocess.is_alive():
       
   301             time.sleep(0.1)
       
   302         else:
       
   303             break
       
   304     
       
   305     sprocess.join()
       
   306     queue.join()
       
   307     for cprocess in tweet_processes:
       
   308         cprocess.join()