script/stream/recorder_tweetstream.py
changeset 253 e9335ee3cf71
parent 243 9213a63fa34a
child 254 2209e66bb50b
equal deleted inserted replaced
252:2ebf22c65168 253:e9335ee3cf71
     1 from getpass import getpass
     1 from getpass import getpass
     2 from iri_tweet import models, utils
     2 from iri_tweet import models, utils
       
     3 from iri_tweet.models import TweetSource, TweetLog
       
     4 from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
     3 from optparse import OptionParser
     5 from optparse import OptionParser
     4 from sqlalchemy.orm import sessionmaker
     6 from sqlalchemy.orm import sessionmaker
     5 from sqlite3 import *
     7 import StringIO
       
     8 import logging
       
     9 import anyjson
     6 import datetime
    10 import datetime
     7 import logging
       
     8 import os
    11 import os
       
    12 import shutil
       
    13 import signal
     9 import socket
    14 import socket
    10 import sys
    15 import sys
    11 import time
    16 import time
       
    17 import traceback
       
    18 import tweepy.auth
    12 import tweetstream
    19 import tweetstream
    13 import tweepy.auth
    20 from iri_tweet.utils import logger
       
    21 from sqlalchemy.exc import OperationalError
    14 socket._fileobject.default_bufsize = 0
    22 socket._fileobject.default_bufsize = 0
    15 
    23 
    16 
    24 
    17 
    25 
    18 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
    26 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
    42 
    50 
    43     :retry_wait: Time to wait before reconnecting in seconds. Default is 5
    51     :retry_wait: Time to wait before reconnecting in seconds. Default is 5
    44 
    52 
    45     """
    53     """
    46 
    54 
    47     def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, **kwargs):
    55     def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs):
    48         self.max_reconnects = reconnects
    56         self.max_reconnects = reconnects
    49         self.retry_wait = retry_wait
    57         self.retry_wait = retry_wait
    50         self._reconnects = 0
    58         self._reconnects = 0
    51         self._error_cb = error_cb
    59         self._error_cb = error_cb
    52         super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, **kwargs)
    60         super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs)
    53 
    61 
    54     def next(self):
    62     def next(self):
    55         while True:
    63         while True:
    56             try:
    64             try:
    57                 return super(ReconnectingTweetStream, self).next()
    65                 return super(ReconnectingTweetStream, self).next()
    70         # Don't listen to auth error, since we can't reasonably reconnect
    78         # Don't listen to auth error, since we can't reasonably reconnect
    71         # when we get one.
    79         # when we get one.
    72 
    80 
    73 
    81 
    74 
    82 
    75 def process_tweet(tweet, session, debug, token_filename):
    83 class SourceProcess(Process):
    76     screen_name = ""
    84     
    77     if 'user' in tweet and 'screen_name' in tweet['user']:
    85     def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
    78         screen_name = tweet['user']['screen_name']
    86         self.session_maker = session_maker
    79     logging.info("Process_tweet from %s : %s" % (screen_name, tweet['text']))
    87         self.queue = queue
    80     logging.debug("Process_tweet :" + repr(tweet))
    88         self.auth = auth
    81     processor = utils.TwitterProcessor(tweet, None, session, token_filename)
    89         self.track = track
    82     processor.process()
    90         self.debug = debug
    83 
    91         self.reconnects = reconnects
    84 def main(username, password, track, session, debug, reconnects, token_filename, duration):
    92         self.token_filename = token_filename
    85 
    93         self.stop_event = stop_event
    86     #username = username or raw_input('Twitter username: ')
    94         super(SourceProcess, self).__init__()
    87     #password = password or getpass('Twitter password: ')
    95 #        self.stop_event = 
    88 
    96     
    89     track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
    97     def run(self):
    90     track_list = [k for k in track_list.split(',')]
    98         
    91     
    99         get_logger().debug("SourceProcess : run")
    92     if username and password:
   100         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
    93         auth = tweepy.auth.BasicAuthHandler(username, password)        
   101         track_list = [k for k in track_list.split(',')]
    94     else:
   102 
    95         consumer_key = models.CONSUMER_KEY
   103         get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))                        
    96         consumer_secret = models.CONSUMER_SECRET
   104         stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
    97         auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
   105         get_logger().debug("SourceProcess : after connecting to stream")
    98         auth.set_access_token(*(utils.get_oauth_token(token_filename)))
   106         stream.muststop = lambda: self.stop_event.is_set()
    99     
   107         
   100     if duration >= 0:
   108         session = self.session_maker()
   101         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration)
   109         
   102     
   110         try:
   103     stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects)
   111             for tweet in stream:
       
   112                 get_logger().debug("tweet " + repr(tweet))
       
   113                 source = TweetSource(original_json=tweet)
       
   114                 get_logger().debug("source created")
       
   115                 add_retries = 0
       
   116                 while add_retries < 10:
       
   117                     try:
       
   118                         add_retries += 1
       
   119                         session.add(source)
       
   120                         session.flush()
       
   121                         break
       
   122                     except OperationalError as e:
       
   123                         session.rollback()
       
   124                         get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
       
   125                         if add_retries==10:
       
   126                             raise e
       
   127                      
       
   128                 source_id = source.id
       
   129                 get_logger().debug("before queue + source id " + repr(source_id))
       
   130                 self.queue.put((source_id, tweet), False)
       
   131                 #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
       
   132                 get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
       
   133                 session.commit()
       
   134 #                if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
       
   135 #                    print "Stop recording after %d seconds." % (duration)
       
   136 #                    break
       
   137         except Exception as e:
       
   138             get_logger().error("Error when processing tweet " + repr(e))
       
   139         finally:
       
   140             session.rollback()
       
   141             stream.close()
       
   142             session.close()
       
   143             self.queue.close()
       
   144             self.stop_event.set()
       
   145 
       
   146 
       
   147 def process_tweet(tweet, source_id, session, token_filename):
   104     try:
   148     try:
   105         for tweet in stream:            
   149         tweet_obj = anyjson.deserialize(tweet)
   106             if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
   150         screen_name = ""
   107                 print "Stop recording after %d seconds." % (duration)
   151         if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
   108                 break
   152             screen_name = tweet_obj['user']['screen_name']
   109             process_tweet(tweet, session, debug, token_filename)
   153         get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
   110             logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime)))
   154         get_logger().debug(u"Process_tweet :" + repr(tweet))
   111             session.commit()
   155         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename)
   112     finally:
   156         processor.process()
   113         stream.close()
   157     except Exception as e:
       
   158         message = u"Error %s processing tweet %s" % (repr(e), tweet)
       
   159         get_logger().error(message)
       
   160         output = StringIO.StringIO()
       
   161         traceback.print_exception(Exception, e, None, None, output)
       
   162         error_stack = output.getvalue()
       
   163         output.close()
       
   164         session.rollback()
       
   165         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
       
   166         session.add(tweet_log)
       
   167         session.commit()
       
   168 
       
   169     
       
   170         
       
   171 class TweetProcess(Process):
       
   172     
       
   173     def __init__(self, session_maker, queue, debug, token_filename, stop_event):
       
   174         self.session_maker = session_maker
       
   175         self.queue = queue
       
   176         self.debug = debug
       
   177         self.stop_event = stop_event
       
   178         self.token_filename = token_filename
       
   179         super(TweetProcess, self).__init__()
       
   180 
       
   181 
       
   182     def run(self):
       
   183         
       
   184         session = self.session_maker()
       
   185         try:
       
   186             while not self.stop_event.is_set():
       
   187                 try:
       
   188                     source_id, tweet_txt = queue.get(True, 10)
       
   189                     get_logger().debug("Processing source id " + repr(source_id))
       
   190                 except Exception as e:
       
   191                     get_logger().debug('Process tweet exception in loop : ' + repr(e))
       
   192                     continue
       
   193                 process_tweet(tweet_txt, source_id, session, self.token_filename)
       
   194                 session.commit()
       
   195         except:
       
   196             raise
       
   197         finally:
       
   198             session.rollback()
       
   199             self.stop_event.set()
       
   200             session.close()
       
   201             
       
   202 def process_leftovers(session, token_filename):
       
   203     
       
   204     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
       
   205     
       
   206     for src in sources:
       
   207         tweet_txt = src.original_json
       
   208         process_tweet(tweet_txt, src.id, session, token_filename)
       
   209 
       
   210         
       
   211     
       
   212     #get tweet source that do not match any message
       
   213     #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
       
   214 
   114         
   215         
   115 def get_options():
   216 def get_options():
   116     parser = OptionParser()
   217     parser = OptionParser()
   117     parser.add_option("-f", "--file", dest="filename",
   218     parser.add_option("-f", "--file", dest="filename",
   118                       help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
   219                       help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
   128                       help="Reconnects", metavar="RECONNECTS", default=10, type='int')
   229                       help="Reconnects", metavar="RECONNECTS", default=10, type='int')
   129     parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
   230     parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
   130                       help="Token file name")
   231                       help="Token file name")
   131     parser.add_option("-d", "--duration", dest="duration",
   232     parser.add_option("-d", "--duration", dest="duration",
   132                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
   233                       help="Duration of recording in seconds", metavar="DURATION", default= -1, type='int')
       
   234     parser.add_option("-N", "--consumer", dest="consumer_nb",
       
   235                       help="number of consumer", metavar="CONSUMER", default=1, type='int')
       
   236 
   133 
   237 
   134 
   238 
   135     utils.set_logging_options(parser)
   239     utils.set_logging_options(parser)
   136 
   240 
   137     return parser.parse_args()
   241     return parser.parse_args()
   138     
   242     
   139 
   243 
   140 if __name__ == '__main__':
   244 if __name__ == '__main__':
   141     
   245     
   142 
       
   143     (options, args) = get_options()
   246     (options, args) = get_options()
   144     
   247     
   145     utils.set_logging(options)
   248     utils.set_logging(options, get_logger())
   146         
   249         
   147     if options.debug:
   250     if options.debug:
   148         print "OPTIONS : "
   251         print "OPTIONS : "
   149         print repr(options)
   252         print repr(options)
   150     
   253     
   151     if options.new and os.path.exists(options.filename):
   254     if options.new and os.path.exists(options.filename):
   152         os.remove(options.filename)
   255         i = 1
   153     
   256         basename, extension = os.path.splitext(options.filename)
   154     engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug>=2))
   257         new_path = '%s.%d%s' % (basename, i, extension)
       
   258         while i < 1000000 and os.path.exists(new_path):
       
   259             i += 1
       
   260             new_path = '%s.%d%s' % (basename, i, extension)
       
   261         if i >= 1000000:
       
   262             raise Exception("Unable to find new filename for " + options.filename)
       
   263         else:
       
   264             shutil.move(options.filename, new_path)
       
   265 
       
   266     
       
   267     queue = JoinableQueue()
       
   268     stop_event = Event()
       
   269 
       
   270     if options.username and options.password:
       
   271         auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
       
   272     else:
       
   273         consumer_key = models.CONSUMER_KEY
       
   274         consumer_secret = models.CONSUMER_SECRET
       
   275         auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
       
   276         auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))
       
   277 
       
   278 
       
   279     engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
   155     Session = sessionmaker(bind=engine)
   280     Session = sessionmaker(bind=engine)
       
   281     
   156     session = Session()
   282     session = Session()
   157 
   283     process_leftovers(session, options.token_filename)
   158     try:
   284     session.commit()
   159         try:
   285     session.close()
   160             main(options.username, options.password, options.track, session, options.debug, options.reconnects, options.token_filename, options.duration)
   286          
   161         except KeyboardInterrupt:
   287     sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)    
   162             print '\nGoodbye!'
   288     
   163         session.commit()
   289     tweet_processes = []
   164     finally:
   290     
   165         session.close()
   291     for i in range(options.consumer_nb):
       
   292         engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
       
   293         Session = sessionmaker(bind=engine)
       
   294         cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
       
   295         tweet_processes.append(cprocess)
       
   296 
       
   297     def interupt_handler(signum, frame):
       
   298         stop_event.set()
       
   299         
       
   300     signal.signal(signal.SIGINT, interupt_handler)
       
   301 
       
   302     sprocess.start()
       
   303     for cprocess in tweet_processes:
       
   304         cprocess.start()
       
   305 
       
   306     if options.duration >= 0:
       
   307         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)
       
   308 
       
   309     while not stop_event.is_set():
       
   310         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
       
   311             stop_event.set()
       
   312             break
       
   313         if sprocess.is_alive():
       
   314             time.sleep(0.1)
       
   315         else:
       
   316             break
       
   317     
       
   318     get_logger().debug("Joining Source Process")
       
   319     sprocess.join()
       
   320     get_logger().debug("Joining Queue")
       
   321     #queue.join()
       
   322     for i,cprocess in enumerate(tweet_processes):
       
   323         get_logger().debug("Joining consumer process Nb %d" % (i+1))
       
   324         cprocess.join()
       
   325     
       
   326     get_logger().debug("Processing leftovers")
       
   327     session = Session()
       
   328     process_leftovers(session, options.token_filename)
       
   329     session.commit()
       
   330     session.close()
       
   331 
       
   332     get_logger().debug("Done. Exiting.")
       
   333