script/stream/recorder_tweetstream.py
changeset 739 350ffcb7ae4d
parent 738 2497c7f38e0a
equal deleted inserted replaced
738:2497c7f38e0a 739:350ffcb7ae4d
    58     if options.username and options.password:
    58     if options.username and options.password:
    59         auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
    59         auth = tweepy.auth.BasicAuthHandler(options.username, options.password)        
    60     else:
    60     else:
    61         consumer_key = models.CONSUMER_KEY
    61         consumer_key = models.CONSUMER_KEY
    62         consumer_secret = models.CONSUMER_SECRET
    62         consumer_secret = models.CONSUMER_SECRET
    63         auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
    63         auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=True)
    64         auth.set_access_token(*access_token)
    64         auth.set_access_token(*access_token)
    65     return auth
    65     return auth
    66 
    66 
    67 
    67 
    68 def add_process_event(type, args, session_maker):
    68 def add_process_event(type, args, session_maker):
   140         self.logger.debug("SourceProcess : track list " + track_list)
   140         self.logger.debug("SourceProcess : track list " + track_list)
   141         
   141         
   142         track_list = [k.strip() for k in track_list.split(',')]
   142         track_list = [k.strip() for k in track_list.split(',')]
   143 
   143 
   144         self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
   144         self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
   145         self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1)
   145         self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1, logger=self.logger)
   146         self.logger.debug("SourceProcess : after connecting to stream")
   146         self.logger.debug("SourceProcess : after connecting to stream")
   147         self.stream.muststop = lambda: self.stop_event.is_set()        
   147         self.stream.muststop = lambda: self.stop_event.is_set()        
   148         
   148         
   149         stream_wrapper = tweetstream.SafeStreamWrapper(self.stream, logger=self.logger)
   149         stream_wrapper = tweetstream.SafeStreamWrapper(self.stream, logger=self.logger)
   150         
   150         
   200         source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
   200         source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
   201         
   201         
   202         source_stream_iter_thread.start()
   202         source_stream_iter_thread.start()
   203         
   203         
   204         while not self.stop_event.is_set():
   204         while not self.stop_event.is_set():
   205             self.logger.info("SourceProcess : In while after start")
   205             self.logger.debug("SourceProcess : In while after start")
   206             self.stop_event.wait(DEFAULT_TIMEOUT)
   206             self.stop_event.wait(DEFAULT_TIMEOUT)
   207             if self.stop_event.is_set() and self.stream:
   207             if self.stop_event.is_set() and self.stream:
   208                 self.stream.close()
   208                 self.stream.close()
   209             elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
   209             elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
   210                 self.stop_event.set()
   210                 self.stop_event.set()
   277                     continue
   277                     continue
   278                 process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger)
   278                 process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger)
   279                 session.commit()
   279                 session.commit()
   280         finally:
   280         finally:
   281             session.rollback()
   281             session.rollback()
   282             self.stop_event.set()
       
   283             session.close()
   282             session.close()
   284 
   283 
   285 
   284 
   286 def get_sessionmaker(conn_str):
   285 def get_sessionmaker(conn_str):
   287     engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
   286     engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
   409         lqueue = mQueue(50)
   408         lqueue = mQueue(50)
   410         logger_queues.append(lqueue)
   409         logger_queues.append(lqueue)
   411         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
   410         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
   412         tweet_processes.append(cprocess)
   411         tweet_processes.append(cprocess)
   413 
   412 
       
   413     log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
       
   414     log_thread.daemon = True
       
   415 
       
   416     log_thread.start()
       
   417 
       
   418     sprocess.start()
       
   419     for cprocess in tweet_processes:
       
   420         cprocess.start()
       
   421 
       
   422     add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
       
   423 
       
   424     if options.duration >= 0:
       
   425         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
       
   426 
   414     def interupt_handler(signum, frame):
   427     def interupt_handler(signum, frame):
   415         utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
   428         utils.get_logger().debug("shutdown asked " + repr(signum) + "  " + repr(inspect.getframeinfo(frame, 9)))
   416         stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
   429         stop_args.update({'message': 'interupt', 'signum':signum, 'frameinfo':inspect.getframeinfo(frame, 9)})
   417         stop_event.set()
   430         stop_event.set()
   418         
   431         
   419     signal.signal(signal.SIGINT , interupt_handler)
   432     signal.signal(signal.SIGINT , interupt_handler)
   420     signal.signal(signal.SIGHUP , interupt_handler)
   433     signal.signal(signal.SIGHUP , interupt_handler)
   421     signal.signal(signal.SIGALRM, interupt_handler)
   434     signal.signal(signal.SIGALRM, interupt_handler)
   422     signal.signal(signal.SIGTERM, interupt_handler)
   435     signal.signal(signal.SIGTERM, interupt_handler)
   423 
       
   424     log_thread = threading.Thread(target=process_log, name="loggingThread", args=(logger_queues, stop_event,))
       
   425     log_thread.daemon = True
       
   426 
       
   427     log_thread.start()
       
   428 
       
   429     sprocess.start()
       
   430     for cprocess in tweet_processes:
       
   431         cprocess.start()
       
   432 
       
   433     add_process_event("pid", {'main':os.getpid(), 'source':(sprocess.name, sprocess.pid), 'consumers':dict([(p.name, p.pid) for p in tweet_processes])}, session_maker)
       
   434 
       
   435     if options.duration >= 0:
       
   436         end_ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=options.duration)    
       
   437     
   436     
   438 
   437 
   439     while not stop_event.is_set():
   438     while not stop_event.is_set():
   440         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
   439         if options.duration >= 0 and  datetime.datetime.utcnow() >= end_ts:
   441             stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
   440             stop_args.update({'message': 'duration', 'duration' : options.duration, 'end_ts' : end_ts})
   442             stop_event.set()
   441             stop_event.set()
   443             break
   442             break
   444         if sprocess.is_alive():            
   443         if sprocess.is_alive():
       
   444             utils.get_logger().debug("Source process alive")
   445             time.sleep(1)
   445             time.sleep(1)
   446         else:
   446         else:
   447             stop_args.update({'message': 'Source process killed'})
   447             stop_args.update({'message': 'Source process killed'})
   448             stop_event.set()
   448             stop_event.set()
   449             break
   449             break