diff -r 80e5b9543cac -r 7fb5a7b0d35c script/stream/recorder_tweetstream.py --- a/script/stream/recorder_tweetstream.py Mon Feb 20 01:35:15 2012 +0100 +++ b/script/stream/recorder_tweetstream.py Mon Feb 20 18:52:19 2012 +0100 @@ -64,55 +64,6 @@ return auth -class ReconnectingTweetStream(tweetstream.FilterStream): - """TweetStream class that automatically tries to reconnect if the - connecting goes down. Reconnecting, and waiting for reconnecting, is - blocking. - - :param username: See :TweetStream: - - :param password: See :TweetStream: - - :keyword url: See :TweetStream: - - :keyword reconnects: Number of reconnects before a ConnectionError is - raised. Default is 3 - - :error_cb: Optional callable that will be called just before trying to - reconnect. The callback will be called with a single argument, the - exception that caused the reconnect attempt. Default is None - - :retry_wait: Time to wait before reconnecting in seconds. Default is 5 - - """ - - def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs): - self.max_reconnects = reconnects - self.retry_wait = retry_wait - self._reconnects = 0 - self._error_cb = error_cb - super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs) - - def next(self): - while True: - try: - utils.get_logger().debug("return super.next") - return super(ReconnectingTweetStream, self).next() - except tweetstream.ConnectionError, e: - utils.get_logger().debug("connection error :" + str(e)) - self._reconnects += 1 - if self._reconnects > self.max_reconnects: - raise tweetstream.ConnectionError("Too many retries") - - # Note: error_cb is not called on the last error since we - # raise a ConnectionError instead - if callable(self._error_cb): - self._error_cb(e) - - time.sleep(self.retry_wait) - # Don't listen to auth error, since we can't reasonably reconnect - # when we get one. - def add_process_event(type, args, session_maker): session = session_maker() try: @@ -170,24 +121,27 @@ def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid): self.track = options.track - self.reconnects = options.reconnects self.token_filename = options.token_filename + self.catchup = options.catchup + self.timeout = options.timeout super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid) def do_run(self): #import pydevd - #pydevd.settrace(suspend=False) + #pydevd.settrace(suspend=True) self.logger = set_logging_process(self.options, self.logger_queue) self.auth = get_auth(self.options, self.access_token) - self.logger.debug("SourceProcess : run") + self.logger.debug("SourceProcess : run ") track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip() - track_list = [k for k in track_list.split(',')] + self.logger.debug("SourceProcess : track list " + track_list) + + track_list = [k.strip() for k in track_list.split(',')] self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list)) - stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True, url=self.options.url) + stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout) self.logger.debug("SourceProcess : after connecting to stream") stream.muststop = lambda: self.stop_event.is_set() @@ -292,13 +246,13 @@ return Session, engine, metadata -def process_leftovers(session, access_token, logger): +def process_leftovers(session, access_token, twitter_query_user, logger): sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None) for src in sources: tweet_txt = src.original_json - process_tweet(tweet_txt, src.id, session, access_token, logger) + process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger) session.commit() @@ -336,8 +290,6 @@ help="new database", default=False) parser.add_option("-D", "--daemon", dest="daemon", action="store_true", help="launch daemon", default=False) - parser.add_option("-r", "--reconnects", dest="reconnects", - help="Reconnects", metavar="RECONNECTS", default=10, type='int') parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token", help="Token file name") parser.add_option("-d", "--duration", dest="duration", @@ -348,6 +300,11 @@ help="The twitter url to connect to.", metavar="URL", default=tweetstream.FilterStream.url) parser.add_option("--query-user", dest="twitter_query_user", action="store_true", help="Query twitter for users", default=False, metavar="QUERY_USER") + parser.add_option("--catchup", dest="catchup", + help="catchup count for tweets", default=None, metavar="CATCHUP", type='int') + parser.add_option("--timeout", dest="timeout", + help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int') + @@ -366,7 +323,7 @@ session = session_maker() try: - process_leftovers(session, access_token, utils.get_logger()) + process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger()) session.commit() finally: session.rollback() @@ -477,7 +434,7 @@ utils.get_logger().debug("Processing leftovers") session = session_maker() try: - process_leftovers(session, access_token, utils.get_logger()) + process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger()) session.commit() finally: session.rollback()