script/stream/recorder_tweetstream.py
changeset 528 7fb5a7b0d35c
parent 464 b9243ade95e2
child 693 2ef837069108
--- a/script/stream/recorder_tweetstream.py	Mon Feb 20 01:35:15 2012 +0100
+++ b/script/stream/recorder_tweetstream.py	Mon Feb 20 18:52:19 2012 +0100
@@ -64,55 +64,6 @@
     return auth
 
 
-class ReconnectingTweetStream(tweetstream.FilterStream):
-    """TweetStream class that automatically tries to reconnect if the
-    connecting goes down. Reconnecting, and waiting for reconnecting, is
-    blocking.
-
-    :param username: See :TweetStream:
-
-    :param password: See :TweetStream:
-
-    :keyword url: See :TweetStream:
-
-    :keyword reconnects: Number of reconnects before a ConnectionError is
-        raised. Default is 3
-
-    :error_cb: Optional callable that will be called just before trying to
-        reconnect. The callback will be called with a single argument, the
-        exception that caused the reconnect attempt. Default is None
-
-    :retry_wait: Time to wait before reconnecting in seconds. Default is 5
-
-    """
-
-    def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, as_text=False, **kwargs):
-        self.max_reconnects = reconnects
-        self.retry_wait = retry_wait
-        self._reconnects = 0
-        self._error_cb = error_cb
-        super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, as_text=as_text, **kwargs)
-
-    def next(self):
-        while True:
-            try:
-                utils.get_logger().debug("return super.next")
-                return super(ReconnectingTweetStream, self).next()
-            except tweetstream.ConnectionError, e:
-                utils.get_logger().debug("connection error :" + str(e))
-                self._reconnects += 1
-                if self._reconnects > self.max_reconnects:
-                    raise tweetstream.ConnectionError("Too many retries")
-
-                # Note: error_cb is not called on the last error since we
-                # raise a ConnectionError instead
-                if  callable(self._error_cb):
-                    self._error_cb(e)
-
-                time.sleep(self.retry_wait)
-        # Don't listen to auth error, since we can't reasonably reconnect
-        # when we get one.
-
 def add_process_event(type, args, session_maker):
     session = session_maker()
     try:
@@ -170,24 +121,27 @@
     
     def __init__(self, session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid):
         self.track = options.track
-        self.reconnects = options.reconnects
         self.token_filename = options.token_filename
+        self.catchup = options.catchup
+        self.timeout = options.timeout
         super(SourceProcess, self).__init__(session_maker, queue, options, access_token, stop_event, logger_queue, parent_pid)
 
     def do_run(self):
         
         #import pydevd
-        #pydevd.settrace(suspend=False)
+        #pydevd.settrace(suspend=True)
 
         self.logger = set_logging_process(self.options, self.logger_queue)
         self.auth = get_auth(self.options, self.access_token) 
         
-        self.logger.debug("SourceProcess : run")
+        self.logger.debug("SourceProcess : run ")
         track_list = self.track # or raw_input('Keywords to track (comma seperated): ').strip()
-        track_list = [k for k in track_list.split(',')]
+        self.logger.debug("SourceProcess : track list " + track_list)
+        
+        track_list = [k.strip() for k in track_list.split(',')]
 
         self.logger.debug("SourceProcess : before connecting to stream " + repr(track_list))                        
-        stream = ReconnectingTweetStream(self.auth, track_list, reconnects=self.reconnects, as_text=True, url=self.options.url)
+        stream = tweetstream.FilterStream(self.auth, track=track_list, raw=True, url=self.options.url, catchup=self.catchup, timeout=self.timeout)
         self.logger.debug("SourceProcess : after connecting to stream")
         stream.muststop = lambda: self.stop_event.is_set()
         
@@ -292,13 +246,13 @@
     return Session, engine, metadata
 
             
-def process_leftovers(session, access_token, logger):
+def process_leftovers(session, access_token, twitter_query_user, logger):
     
     sources = session.query(TweetSource).outerjoin(TweetLog).filter(TweetLog.id == None)
     
     for src in sources:
         tweet_txt = src.original_json
-        process_tweet(tweet_txt, src.id, session, access_token, logger)
+        process_tweet(tweet_txt, src.id, session, access_token, twitter_query_user, logger)
         session.commit()
 
         
@@ -336,8 +290,6 @@
                       help="new database", default=False)
     parser.add_option("-D", "--daemon", dest="daemon", action="store_true",
                       help="launch daemon", default=False)
-    parser.add_option("-r", "--reconnects", dest="reconnects",
-                      help="Reconnects", metavar="RECONNECTS", default=10, type='int')
     parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
                       help="Token file name")
     parser.add_option("-d", "--duration", dest="duration",
@@ -348,6 +300,11 @@
                       help="The twitter url to connect to.", metavar="URL", default=tweetstream.FilterStream.url)
     parser.add_option("--query-user", dest="twitter_query_user", action="store_true",
                       help="Query twitter for users", default=False, metavar="QUERY_USER")
+    parser.add_option("--catchup", dest="catchup",
+                      help="catchup count for tweets", default=None, metavar="CATCHUP", type='int')
+    parser.add_option("--timeout", dest="timeout",
+                      help="timeout for connecting in seconds", default=60, metavar="TIMEOUT", type='int')
+    
 
 
 
@@ -366,7 +323,7 @@
     
     session = session_maker()
     try:
-        process_leftovers(session, access_token, utils.get_logger())
+        process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
         session.commit()
     finally:
         session.rollback()
@@ -477,7 +434,7 @@
         utils.get_logger().debug("Processing leftovers")
         session = session_maker()
         try:
-            process_leftovers(session, access_token, utils.get_logger())
+            process_leftovers(session, access_token, options.twitter_query_user, utils.get_logger())
             session.commit()
         finally:
             session.rollback()