--- a/script/stream/recorder_tweetstream.py Mon Feb 20 01:35:15 2012 +0100
+++ b/script/stream/recorder_tweetstream.py Mon Feb 20 18:52:19 2012 +0100
@@ -64,55 +64,6 @@
return auth
-class ReconnectingTweetStream(tweetstream.FilterStream):
- """TweetStream class that automatically tries to reconnect if the
- connecting goes down. Reconnecting, and waiting for reconnecting, is
- blocking.
-
- :param username: See :TweetStream:
-
- :param password: See :TweetStream:
-
- :keyword url: See :TweetStream:
-
- :keyword reconnects: Number of reconnects before a ConnectionError is
- raised. Default is 3
-
- :error_cb: Optional callable that will be called just before trying to
- reconnect. The callback will be called with a single argument, the
- exception that caused the reconnect attempt. Default is None
-
- :retry_wait: Time to wait before reconnecting in seconds. Default is 5
-
- """
-
- def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs):
- self.max_reconnects = reconnects
- self.retry_wait = retry_wait
- self._reconnects = 0
- self._error_cb = error_cb
- super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs)
-
- def next(self):
- while True:
- try:
- utils.get_logger().debug("return super.next")
- return super(ReconnectingTweetStream, self).next()
- except tweetstream.ConnectionError, e:
- utils.get_logger().debug("connection error :" + str(e))
- self._reconnects += 1
- if self._reconnects > self.max_reconnects:
- raise tweetstream.ConnectionError("Too many retries")
-
- # Note: error_cb is not called on the last error since we
- # raise a ConnectionError instead
- if callable(self._error_cb):
- self._error_cb(e)
-
- time.sleep(self.retry_wait)
- # Don't listen to auth error, since we can't reasonably reconnect
- # when we get one.
-
def add_process_event(type, args, session_maker):
session = session_maker()
try:
@@ -170,24 +121,27 @@
def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
self.track = options.track
- self.reconnects = options.reconnects
self.token_filename = options.token_filename
+ self.catchup = options.catchup
+ self.timeout = options.timeout
super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
def do_run(self):
#import pydevd
- #pydevd.settrace(suspend=False)
+ #pydevd.settrace(suspend=True)
self.logger = set_logging_process(self.options, self.logger_queue)
self.auth = get_auth(self.options, self.access_token)
- self.logger.debug("SourceProcess : run")
+ self.logger.debug("SourceProcess : run ")
track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
- track_list = [k for k in track_list.split(',')]
+ self.logger.debug("SourceProcess : track list " + track_list)
+
+ track_list = [k.strip() for k in track_list.split(',')]
self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))
- stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True, url=self.options.url)
+ stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout)
self.logger.debug("SourceProcess : after connecting to stream")
stream.muststop = lambda: self.stop_event.is_set()
@@ -292,13 +246,13 @@
return Session, engine, metadata
-def process_leftovers(session, access_token, logger):
+def process_leftovers(session, access_token, twitter_query_user, logger):
sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
for src in sources:
tweet_txt = src.original_json
- process_tweet(tweet_txt, src.id, session, access_token, logger)
+ process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger)
session.commit()
@@ -336,8 +290,6 @@
help="new database", default=False)
parser.add_option("-D", "--daemon", dest="daemon", action="store_true",
help="launch daemon", default=False)
- parser.add_option("-r", "--reconnects", dest="reconnects",
- help="Reconnects", metavar="RECONNECTS", default=10, type='int')
parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
help="Token file name")
parser.add_option("-d", "--duration", dest="duration",
@@ -348,6 +300,11 @@
help="The twitter url to connect to.", metavar="URL", default=tweetstream.FilterStream.url)
parser.add_option("--query-user", dest="twitter_query_user", action="store_true",
help="Query twitter for users", default=False, metavar="QUERY_USER")
+ parser.add_option("--catchup", dest="catchup",
+ help="catchup count for tweets", default=None, metavar="CATCHUP", type='int')
+ parser.add_option("--timeout", dest="timeout",
+ help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int')
+
@@ -366,7 +323,7 @@
session = session_maker()
try:
- process_leftovers(session, access_token, utils.get_logger())
+ process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
session.commit()
finally:
session.rollback()
@@ -477,7 +434,7 @@
utils.get_logger().debug("Processing leftovers")
session = session_maker()
try:
- process_leftovers(session, access_token, utils.get_logger())
+ process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
session.commit()
finally:
session.rollback()