script/stream/recorder_tweetstream.py
changeset 243 9213a63fa34a
parent 242 cdd7d3c0549c
child 254 2209e66bb50b
equal deleted inserted replaced
242:cdd7d3c0549c 243:9213a63fa34a
     1 from getpass import getpass
     1 from getpass import getpass
     2 from iri_tweet import models, utils
     2 from iri_tweet import models, utils
     3 from iri_tweet.models import TweetSource, TweetLog
     3 from iri_tweet.models import TweetSource, TweetLog
     4 from multiprocessing import Queue, JoinableQueue, Process, Event
     4 from multiprocessing import Queue, JoinableQueue, Process, Event, get_logger
     5 from optparse import OptionParser
     5 from optparse import OptionParser
     6 from sqlalchemy.orm import sessionmaker
     6 from sqlalchemy.orm import sessionmaker
     7 from sqlite3 import *
       
     8 import StringIO
     7 import StringIO
       
     8 import logging
     9 import anyjson
     9 import anyjson
    10 import datetime
    10 import datetime
    11 import logging
       
    12 import os
    11 import os
    13 import shutil
    12 import shutil
    14 import signal
    13 import signal
    15 import socket
    14 import socket
    16 import sys
    15 import sys
    17 import time
    16 import time
    18 import traceback
    17 import traceback
    19 import tweepy.auth
    18 import tweepy.auth
    20 import tweetstream
    19 import tweetstream
       
    20 from iri_tweet.utils import logger
       
    21 from sqlalchemy.exc import OperationalError
    21 socket._fileobject.default_bufsize = 0
    22 socket._fileobject.default_bufsize = 0
    22 
    23 
    23 
    24 
    24 
    25 
    25 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
    26 #columns_tweet = [u'favorited', u'truncated', u'text', u'created_at', u'source', u'in_reply_to_status_id', u'in_reply_to_screen_name', u'in_reply_to_user_id', u'geo', u'id', u'user']
    80 
    81 
    81 
    82 
    82 class SourceProcess(Process):
    83 class SourceProcess(Process):
    83     
    84     
    84     def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
    85     def __init__(self, session_maker, queue, auth, track, debug, reconnects, token_filename, stop_event):
    85         super(SourceProcess, self).__init__()
       
    86         self.session_maker = session_maker
    86         self.session_maker = session_maker
    87         self.queue = queue
    87         self.queue = queue
    88         self.auth = auth
    88         self.auth = auth
    89         self.track = track
    89         self.track = track
    90         self.debug = debug
    90         self.debug = debug
    91         self.reconnects = reconnects
    91         self.reconnects = reconnects
    92         self.token_filename = token_filename
    92         self.token_filename = token_filename
    93         self.stop_event = stop_event
    93         self.stop_event = stop_event
       
    94         super(SourceProcess, self).__init__()
    94 #        self.stop_event = 
    95 #        self.stop_event = 
    95     
    96     
    96     def run(self):
    97     def run(self):
    97         
    98         
       
    99         get_logger().debug("SourceProcess : run")
    98         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
   100         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
    99         track_list = [k for k in track_list.split(',')]
   101         track_list = [k for k in track_list.split(',')]
   100                         
   102 
       
   103         get_logger().debug("SourceProcess : before connecting to stream " + repr(track_list))                        
   101         stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
   104         stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True)
       
   105         get_logger().debug("SourceProcess : after connecting to stream")
   102         stream.muststop = lambda: self.stop_event.is_set()
   106         stream.muststop = lambda: self.stop_event.is_set()
   103         
   107         
   104         session = self.session_maker()
   108         session = self.session_maker()
   105         
   109         
   106         try:
   110         try:
   107             for tweet in stream:
   111             for tweet in stream:
       
   112                 get_logger().debug("tweet " + repr(tweet))
   108                 source = TweetSource(original_json=tweet)
   113                 source = TweetSource(original_json=tweet)
   109                 session.add(source)
   114                 get_logger().debug("source created")
   110                 session.flush()
   115                 add_retries = 0
       
   116                 while add_retries < 10:
       
   117                     try:
       
   118                         add_retries += 1
       
   119                         session.add(source)
       
   120                         session.flush()
       
   121                         break
       
   122                     except OperationalError as e:
       
   123                         session.rollback()
       
   124                         get_logger().debug("Operational Error %s nb %d" % (repr(e), add_retries))
       
   125                         if add_retries==10:
       
   126                             raise e
       
   127                      
   111                 source_id = source.id
   128                 source_id = source.id
   112                 queue.put((source_id, tweet), False)
   129                 get_logger().debug("before queue + source id " + repr(source_id))
       
   130                 self.queue.put((source_id, tweet), False)
   113                 #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
   131                 #process_tweet(tweet, source_id, self.session, self.debug, self.token_filename)
   114                 logging.info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
   132                 get_logger().info("Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
   115                 session.commit()
   133                 session.commit()
   116 #                if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
   134 #                if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
   117 #                    print "Stop recording after %d seconds." % (duration)
   135 #                    print "Stop recording after %d seconds." % (duration)
   118 #                    break
   136 #                    break
   119         except:
   137         except Exception as e:
       
   138             get_logger().error("Error when processing tweet " + repr(e))
       
   139         finally:
   120             session.rollback()
   140             session.rollback()
   121         finally:
       
   122             stream.close()
   141             stream.close()
   123             session.close()
   142             session.close()
   124             
   143             self.queue.close()
       
   144             self.stop_event.set()
       
   145 
       
   146 
       
   147 def process_tweet(tweet, source_id, session, token_filename):
       
   148     try:
       
   149         tweet_obj = anyjson.deserialize(tweet)
       
   150         screen_name = ""
       
   151         if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
       
   152             screen_name = tweet_obj['user']['screen_name']
       
   153         get_logger().info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
       
   154         get_logger().debug(u"Process_tweet :" + repr(tweet))
       
   155         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, token_filename)
       
   156         processor.process()
       
   157     except Exception as e:
       
   158         message = u"Error %s processing tweet %s" % (repr(e), tweet)
       
   159         get_logger().error(message)
       
   160         output = StringIO.StringIO()
       
   161         traceback.print_exception(Exception, e, None, None, output)
       
   162         error_stack = output.getvalue()
       
   163         output.close()
       
   164         session.rollback()
       
   165         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=error_stack)
       
   166         session.add(tweet_log)
       
   167         session.commit()
       
   168 
       
   169     
   125         
   170         
   126 class TweetProcess(Process):
   171 class TweetProcess(Process):
   127     
   172     
   128     def __init__(self, session_maker, queue, debug, token_filename, stop_event):
   173     def __init__(self, session_maker, queue, debug, token_filename, stop_event):
   129         super(TweetProcess, self).__init__()
       
   130         self.session_maker = session_maker
   174         self.session_maker = session_maker
   131         self.queue = queue
   175         self.queue = queue
   132         self.debug = debug
   176         self.debug = debug
   133         self.stop_event = stop_event
   177         self.stop_event = stop_event
   134         self.token_filename = token_filename
   178         self.token_filename = token_filename
       
   179         super(TweetProcess, self).__init__()
       
   180 
   135 
   181 
   136     def run(self):
   182     def run(self):
   137         
   183         
   138         session = self.session_maker()
   184         session = self.session_maker()
   139         try:
   185         try:
   140             while not self.stop_event.is_set():
   186             while not self.stop_event.is_set():
   141                 try:
   187                 try:
   142                     source_id, tweet_txt = queue.get(True, 30)
   188                     source_id, tweet_txt = queue.get(True, 10)
   143                 except:
   189                     get_logger().debug("Processing source id " + repr(source_id))
       
   190                 except Exception as e:
       
   191                     get_logger().debug('Process tweet exception in loop : ' + repr(e))
   144                     continue
   192                     continue
   145                 process_tweet(tweet_txt, source_id, session)
   193                 process_tweet(tweet_txt, source_id, session, self.token_filename)
   146                 session.commit()
   194                 session.commit()
   147                 self.queue.task_done()
       
   148         except:
   195         except:
   149             session.rollback()
       
   150             raise
   196             raise
   151         finally:
   197         finally:
       
   198             session.rollback()
       
   199             self.stop_event.set()
   152             session.close()
   200             session.close()
   153             
   201             
   154         
   202 def process_leftovers(session, token_filename):
   155     def process_tweet(tweet, source_id, session):
   203     
   156         
   204     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
   157         try:
   205     
   158             tweet_obj = anyjson.deserialize(tweet)
   206     for src in sources:
   159             screen_name = ""
   207         tweet_txt = src.original_json
   160             if 'user' in tweet_obj and 'screen_name' in tweet_obj['user']:
   208         process_tweet(tweet_txt, src.id, session, token_filename)
   161                 screen_name = tweet_obj['user']['screen_name']
   209 
   162             logging.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
   210         
   163             logging.debug(u"Process_tweet :" + repr(tweet))
   211     
   164             processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, self.token_filename)
   212     #get tweet source that do not match any message
   165             processor.process()
   213     #select * from tweet_tweet_source ts left join tweet_tweet_log tl on ts.id = tl.tweet_source_id where tl.id isnull;
   166         except Exception, e:
   214 
   167             message = u"Error %e processing tweet %s" % (unicode(e), tweet)
       
   168             logging.error(message)
       
   169             output = StringIO.StringIO()
       
   170             traceback.print_exception(Exception, e, None, None, output)
       
   171             tweet_log = TweetLog(status=TweetLog.TWEET_STATUS['ERROR'], error=message, error_stack=output.getvalue())
       
   172             output.close()
       
   173 
       
   174 
       
   175 
       
   176 #def main_source(username, password, track, session, debug, reconnects, token_filename, duration):
       
   177 
       
   178     #username = username or raw_input('Twitter username: ')
       
   179     #password = password or getpass('Twitter password: ')
       
   180 
       
   181 #    track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
       
   182 #    track_list = [k for k in track_list.split(',')]
       
   183     
       
   184 #    if username and password:
       
   185 #        auth = tweepy.auth.BasicAuthHandler(username, password)        
       
   186 #    else:
       
   187 #        consumer_key = models.CONSUMER_KEY
       
   188 #        consumer_secret = models.CONSUMER_SECRET
       
   189 #        auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
       
   190 #        auth.set_access_token(*(utils.get_oauth_token(token_filename)))
       
   191     
       
   192 #    if duration >= 0:
       
   193 #        end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=duration)
       
   194     
       
   195 #    stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects, as_text=True)
       
   196 #    try:
       
   197 #        for tweet in stream:
       
   198 #            source = TweetSource(original_json=tweet)
       
   199 #            session.add(source)
       
   200 #            session.flush()            
       
   201 #            source_id = source.id
       
   202 #            process_tweet(tweet, source_id, session, debug, token_filename)
       
   203 #            logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime)))
       
   204 #            session.commit()
       
   205 #            if duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
       
   206 #                print "Stop recording after %d seconds." % (duration)
       
   207 #                break
       
   208 #    finally:
       
   209 #        stream.close()
       
   210         
   215         
   211 def get_options():
   216 def get_options():
   212     parser = OptionParser()
   217     parser = OptionParser()
   213     parser.add_option("-f", "--file", dest="filename",
   218     parser.add_option("-f", "--file", dest="filename",
   214                       help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
   219                       help="write tweet to FILE", metavar="FILE", default="enmi2010_twitter.db")
   238 
   243 
   239 if __name__ == '__main__':
   244 if __name__ == '__main__':
   240     
   245     
   241     (options, args) = get_options()
   246     (options, args) = get_options()
   242     
   247     
   243     utils.set_logging(options)
   248     utils.set_logging(options, get_logger())
   244         
   249         
   245     if options.debug:
   250     if options.debug:
   246         print "OPTIONS : "
   251         print "OPTIONS : "
   247         print repr(options)
   252         print repr(options)
   248     
   253     
   249     if options.new and os.path.exists(options.filename):
   254     if options.new and os.path.exists(options.filename):
   250         i = 1
   255         i = 1
   251         basename, extension = os.path.splitext(options.filename)
   256         basename, extension = os.path.splitext(options.filename)
   252         new_path = '$s.%d.%s' % (basename, i, extension)
   257         new_path = '%s.%d%s' % (basename, i, extension)
   253         while i < 1000000 and os.path.exists(new_path):
   258         while i < 1000000 and os.path.exists(new_path):
   254             i += 1
   259             i += 1
   255             new_path = '$s.%d.%s' % (basename, i, extension)
   260             new_path = '%s.%d%s' % (basename, i, extension)
   256         if i >= 1000000:
   261         if i >= 1000000:
   257             raise Exception("Unable to find new filename for " + options.filename)
   262             raise Exception("Unable to find new filename for " + options.filename)
   258         else:
   263         else:
   259             shutil.move(options.filename, new_path)
   264             shutil.move(options.filename, new_path)
   260 
   265 
   261     
   266     
   262     engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
       
   263     Session = sessionmaker(bind=engine)
       
   264     queue = JoinableQueue()
   267     queue = JoinableQueue()
   265     stop_event = Event()
   268     stop_event = Event()
   266 
   269 
   267     if options.username and options.password:
   270     if options.username and options.password:
   268         auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
   271         auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
   270         consumer_key = models.CONSUMER_KEY
   273         consumer_key = models.CONSUMER_KEY
   271         consumer_secret = models.CONSUMER_SECRET
   274         consumer_secret = models.CONSUMER_SECRET
   272         auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
   275         auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
   273         auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))
   276         auth.set_access_token(*(utils.get_oauth_token(options.token_filename)))
   274 
   277 
   275      
   278 
       
   279     engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
       
   280     Session = sessionmaker(bind=engine)
       
   281     
       
   282     session = Session()
       
   283     process_leftovers(session, options.token_filename)
       
   284     session.commit()
       
   285     session.close()
       
   286          
   276     sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)    
   287     sprocess = SourceProcess(Session, queue, auth, options.track, options.debug, options.reconnects, options.token_filename, stop_event)    
   277     
   288     
   278     tweet_processes = []
   289     tweet_processes = []
   279     
   290     
   280     for i in range(options.consumer_nb):
   291     for i in range(options.consumer_nb):
       
   292         engine, metadata = models.setup_database('sqlite:///' + options.filename, echo=(options.debug >= 2))
       
   293         Session = sessionmaker(bind=engine)
   281         cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
   294         cprocess = TweetProcess(Session, queue, options.debug, options.token_filename, stop_event)
   282         tweet_processes.append(cprocess)
   295         tweet_processes.append(cprocess)
   283 
   296 
   284     def interupt_handler(signum, frame):
   297     def interupt_handler(signum, frame):
   285         stop_event.set()
   298         stop_event.set()
   300         if sprocess.is_alive():
   313         if sprocess.is_alive():
   301             time.sleep(0.1)
   314             time.sleep(0.1)
   302         else:
   315         else:
   303             break
   316             break
   304     
   317     
       
   318     get_logger().debug("Joining Source Process")
   305     sprocess.join()
   319     sprocess.join()
   306     queue.join()
   320     get_logger().debug("Joining Queue")
   307     for cprocess in tweet_processes:
   321     #queue.join()
       
   322     for i,cprocess in enumerate(tweet_processes):
       
   323         get_logger().debug("Joining consumer process Nb %d" % (i+1))
   308         cprocess.join()
   324         cprocess.join()
       
   325     
       
   326     get_logger().debug("Processing leftovers")
       
   327     session = Session()
       
   328     process_leftovers(session, options.token_filename)
       
   329     session.commit()
       
   330     session.close()
       
   331 
       
   332     get_logger().debug("Done. Exiting.")
       
   333