script/stream/recorder_tweetstream.py
changeset 693 2ef837069108
parent 528 7fb5a7b0d35c
child 737 8f6a4d6dfe14
equal deleted inserted replaced
692:51072e5e6ea9 693:2ef837069108
    21 import sys
    21 import sys
    22 import threading
    22 import threading
    23 import time
    23 import time
    24 import traceback
    24 import traceback
    25 import tweepy.auth
    25 import tweepy.auth
    26 import tweetstream
    26 import iri_tweet.stream as tweetstream
    27 import urllib2
    27 import urllib2
    28 socket._fileobject.default_bufsize = 0
    28 socket._fileobject.default_bufsize = 0
    29 
    29 
    30 
    30 
    31 
    31 
    33 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
    33 columns_tweet = [u'user', u'favorited', u'contributors', u'truncated', u'text', u'created_at', u'retweeted', u'in_reply_to_status_id_str', u'coordinates', u'in_reply_to_user_id_str', u'entities', u'in_reply_to_status_id', u'place', u'in_reply_to_user_id', u'id', u'in_reply_to_screen_name', u'retweet_count', u'geo', u'id_str', u'source']
    34 #columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
    34 #columns_user = [u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'geo_enabled', u'profile_background_image_url', u'screen_name', u'profile_background_tile', u'favourites_count', u'name', u'url', u'created_at', u'time_zone', u'profile_sidebar_border_color', u'following']
    35 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
    35 columns_user = [u'follow_request_sent', u'profile_use_background_image', u'id', u'verified', u'profile_sidebar_fill_color', u'profile_text_color', u'followers_count', u'protected', u'location', u'profile_background_color', u'id_str', u'utc_offset', u'statuses_count', u'description', u'friends_count', u'profile_link_color', u'profile_image_url', u'notifications', u'show_all_inline_media', u'geo_enabled', u'profile_background_image_url', u'name', u'lang', u'following', u'profile_background_tile', u'favourites_count', u'screen_name', u'url', u'created_at', u'contributors_enabled', u'time_zone', u'profile_sidebar_border_color', u'is_translator', u'listed_count']
    36 #just put it in a sqlite3 tqble
    36 #just put it in a sqlite3 tqble
    37 
    37 
       
    38 DEFAULT_TIMEOUT = 5
    38 
    39 
    39 def set_logging(options):
    40 def set_logging(options):
    40     loggers = []
    41     loggers = []
    41     
    42     
    42     loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
    43     loggers.append(utils.set_logging(options, logging.getLogger('iri.tweet')))
   122     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
   123     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
   123         self.track = options.track
   124         self.track = options.track
   124         self.token_filename = options.token_filename
   125         self.token_filename = options.token_filename
   125         self.catchup = options.catchup
   126         self.catchup = options.catchup
   126         self.timeout = options.timeout
   127         self.timeout = options.timeout
       
   128         self.stream = None
   127         super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
   129         super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
   128 
   130                     
   129     def do_run(self):
   131     def __source_stream_iter(self):
   130         
   132         
   131         #import pydevd
       
   132         #pydevd.settrace(suspend=True)
       
   133 
       
   134         self.logger = set_logging_process(self.options, self.logger_queue)
   133         self.logger = set_logging_process(self.options, self.logger_queue)
       
   134         self.logger.debug("SourceProcess : run ")
       
   135         
   135         self.auth = get_auth(self.options, self.access_token) 
   136         self.auth = get_auth(self.options, self.access_token) 
   136         
   137         self.logger.debug("SourceProcess : auth set ")
   137         self.logger.debug("SourceProcess : run ")
   138         
   138         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
   139         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
   139         self.logger.debug("SourceProcess : track list " + track_list)
   140         self.logger.debug("SourceProcess : track list " + track_list)
   140         
   141         
   141         track_list = [k.strip() for k in track_list.split(',')]
   142         track_list = [k.strip() for k in track_list.split(',')]
   142 
   143 
   143         self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
   144         self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
   144         stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout)
   145         self.stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout, chunk_size=1)
   145         self.logger.debug("SourceProcess : after connecting to stream")
   146         self.logger.debug("SourceProcess : after connecting to stream")
   146         stream.muststop = lambda: self.stop_event.is_set()
   147         self.stream.muststop = lambda: self.stop_event.is_set()        
       
   148         
       
   149         stream_wrapper = tweetstream.SafeStreamWrapper(self.stream, logger=self.logger)
   147         
   150         
   148         session = self.session_maker()
   151         session = self.session_maker()
   149         
   152         
   150         try:
   153         try:
   151             for tweet in stream:
   154             for tweet in stream_wrapper:
   152                 if not self.parent_is_alive():
   155                 if not self.parent_is_alive():
       
   156                     self.stop_event.set()
       
   157                     stop_thread.join(5)
   153                     sys.exit()
   158                     sys.exit()
   154                 self.logger.debug("SourceProcess : tweet " + repr(tweet))
   159                 self.logger.debug("SourceProcess : tweet " + repr(tweet))
   155                 source = TweetSource(original_json=tweet)
   160                 source = TweetSource(original_json=tweet)
   156                 self.logger.debug("SourceProcess : source created")
   161                 self.logger.debug("SourceProcess : source created")
   157                 add_retries = 0
   162                 add_retries = 0
   163                         break
   168                         break
   164                     except OperationalError as e:
   169                     except OperationalError as e:
   165                         session.rollback()
   170                         session.rollback()
   166                         self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
   171                         self.logger.debug("SourceProcess : Operational Error %s nb %d" % (repr(e), add_retries))
   167                         if add_retries == 10:
   172                         if add_retries == 10:
   168                             raise e
   173                             raise
   169                      
   174                      
   170                 source_id = source.id
   175                 source_id = source.id
   171                 self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
   176                 self.logger.debug("SourceProcess : before queue + source id " + repr(source_id))
   172                 self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (stream.count, stream.rate, int(time.time() - stream.starttime)))
   177                 self.logger.info("SourceProcess : Tweet count: %d - current rate : %.2f - running : %s" % (self.stream.count, self.stream.rate, int(time.time() - self.stream.starttime)))
   173                 session.commit()
   178                 session.commit()
   174                 self.queue.put((source_id, tweet), False)
   179                 self.queue.put((source_id, tweet), False)
   175 
   180 
   176         except Exception as e:
   181         except Exception as e:
   177             self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
   182             self.logger.error("SourceProcess : Error when processing tweet " + repr(e))
       
   183             raise
   178         finally:
   184         finally:
   179             session.rollback()
   185             session.rollback()
   180             stream.close()
       
   181             session.close()
   186             session.close()
   182             self.queue.close()
   187             with self.logger_queue.mutex:
   183             self.stop_event.set()
   188                 self.logger_queue.clear()
       
   189                 self.logger_queue.close()
       
   190             with self.queue.mutex:
       
   191                 self.queue.clear()
       
   192                 self.queue.close()
       
   193             self.stream.close()
       
   194             self.stream = None
       
   195             if not self.stop_event.is_set():
       
   196                 self.stop_event.set()
       
   197 
       
   198 
       
   199     def do_run(self):
       
   200         
       
   201         import pydevd
       
   202         pydevd.settrace(suspend=False)
       
   203         
       
   204         source_stream_iter_thread = threading.Thread(target=self.__source_stream_iter , name="SourceStreamIterThread")
       
   205         
       
   206         source_stream_iter_thread.start()
       
   207         
       
   208         while not self.stop_event.is_set():        
       
   209             self.stop_event.wait(DEFAULT_TIMEOUT)
       
   210             if self.stop_event.is_set() and self.stream:
       
   211                 self.stream.close()
       
   212             elif not self.stop_event.is_set() and not source_stream_iter_thread.is_alive:
       
   213                 with self.stop_event.mutex:
       
   214                     self.stop_event.set()
       
   215 
       
   216         source_stream_iter_thread.join(30)
   184 
   217 
   185 
   218 
   186 def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger):
   219 def process_tweet(tweet, source_id, session, access_token, twitter_query_user, logger):
   187     try:
   220     try:
       
   221         if not tweet.strip():
       
   222             return
   188         tweet_obj = anyjson.deserialize(tweet)
   223         tweet_obj = anyjson.deserialize(tweet)
   189         if 'text' not in tweet_obj:
   224         if 'text' not in tweet_obj:
   190             tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
   225             tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'])
   191             session.add(tweet_log)
   226             session.add(tweet_log)
   192             return
   227             return
   195             screen_name = tweet_obj['user']['screen_name']
   230             screen_name = tweet_obj['user']['screen_name']
   196         logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
   231         logger.info(u"Process_tweet from %s : %s" % (screen_name, tweet_obj['text']))
   197         logger.debug(u"Process_tweet :" + repr(tweet))
   232         logger.debug(u"Process_tweet :" + repr(tweet))
   198         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user)
   233         processor = utils.TwitterProcessor(tweet_obj, tweet, source_id, session, access_token, None, twitter_query_user)
   199         processor.process()
   234         processor.process()
       
   235     except ValueError as e:
       
   236         message = u"Value Error %s processing tweet %s" % (repr(e), tweet)
       
   237         output = StringIO.StringIO()
       
   238         try:
       
   239             traceback.print_exc(file=output)
       
   240             error_stack = output.getvalue()
       
   241         finally:
       
   242             output.close()
       
   243         tweet_log = TweetLog(tweet_source_id=source_id, status=TweetLog.TWEET_STATUS['NOT_TWEET'], error=message, error_stack=error_stack)
       
   244         session.add(tweet_log)
       
   245         session.commit()        
   200     except Exception as e:
   246     except Exception as e:
   201         message = u"Error %s processing tweet %s" % (repr(e), tweet)
   247         message = u"Error %s processing tweet %s" % (repr(e), tweet)
   202         logger.exception(message)
   248         logger.exception(message)
   203         output = StringIO.StringIO()
   249         output = StringIO.StringIO()
   204         try:
   250         try:
   235                 process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger)
   281                 process_tweet(tweet_txt, source_id, session, self.access_token, self.twitter_query_user, self.logger)
   236                 session.commit()
   282                 session.commit()
   237         finally:
   283         finally:
   238             session.rollback()
   284             session.rollback()
   239             self.stop_event.set()
   285             self.stop_event.set()
       
   286             with self.logger_queue.mutex:
       
   287                 self.logger_queue.clear()
       
   288             with self.queue.mutex:
       
   289                 self.queue.clear()
   240             session.close()
   290             session.close()
   241 
   291 
   242 
   292 
   243 def get_sessionmaker(conn_str):
   293 def get_sessionmaker(conn_str):
   244     engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
   294     engine, metadata, Session = models.setup_database(conn_str, echo=False, create_all=False, autocommit=False)
   351     process_engines = []
   401     process_engines = []
   352     logger_queues = []
   402     logger_queues = []
   353     
   403     
   354     SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
   404     SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
   355     process_engines.append(engine_process)
   405     process_engines.append(engine_process)
   356     lqueue = mQueue(1)
   406     lqueue = mQueue(50)
   357     logger_queues.append(lqueue)
   407     logger_queues.append(lqueue)
   358     pid = os.getpid()
   408     pid = os.getpid()
   359     sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
   409     sprocess = SourceProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)    
   360     
   410     
   361     tweet_processes = []
   411     tweet_processes = []
   362     
   412     
   363     for i in range(options.process_nb - 1):
   413     for i in range(options.process_nb - 1):
   364         SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
   414         SessionProcess, engine_process, metadata_process = get_sessionmaker(conn_str)
   365         process_engines.append(engine_process)
   415         process_engines.append(engine_process)
   366         lqueue = mQueue(1)
   416         lqueue = mQueue(50)
   367         logger_queues.append(lqueue)
   417         logger_queues.append(lqueue)
   368         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
   418         cprocess = TweetProcess(SessionProcess, queue, options, access_token, stop_event, lqueue, pid)
   369         tweet_processes.append(cprocess)
   419         tweet_processes.append(cprocess)
   370 
   420 
   371     def interupt_handler(signum, frame):
   421     def interupt_handler(signum, frame):