script/lib/tweetstream/tweetstream/__init__.py
changeset 15 5d552b6a0e55
parent 12 4daf47fcf792
child 16 6d391ad4fd6a
--- a/script/lib/tweetstream/tweetstream/__init__.py	Tue Jan 18 18:25:18 2011 +0100
+++ b/script/lib/tweetstream/tweetstream/__init__.py	Thu Jan 20 10:44:04 2011 +0100
@@ -6,11 +6,14 @@
 __homepage__ = "http://bitbucket.org/runeh/tweetstream/"
 __docformat__ = "restructuredtext"
 
+import anyjson
+import logging
+import socket
+import time
 import urllib
 import urllib2
-import socket
-import time
-import anyjson
+import urlparse
+socket._fileobject.default_bufsize = 0
 
 
 """
@@ -57,10 +60,9 @@
 class TweetStream(object):
     """A network connection to Twitters streaming API
 
-    :param username: Twitter username for the account accessing the API.
-    :param password: Twitter password for the account accessing the API.
+    :param auth: Twitter authentication (user name/password - oauth)
 
-    :keyword url: URL to connect to. This can be either an endopoint name,
+    :keyword url: URL to connect to. This can be either an endpoint name,
      such as "sample", or a full URL. By default, the public "sample" url
      is used. All known endpoints are defined in the :URLS: attribute
 
@@ -102,12 +104,11 @@
         :attr: `USER_AGENT`.
 """
 
-    def __init__(self, username, password, url="sample"):
+    def __init__(self, auth, url="sample"):
         self._conn = None
         self._rate_ts = None
         self._rate_cnt = 0
-        self._username = username
-        self._password = password
+        self._auth = auth
 
         self.rate_period = 10 # in seconds
         self.connected = False
@@ -130,16 +131,30 @@
     def _init_conn(self):
         """Open the connection to the twitter server"""
         headers = {'User-Agent': self.user_agent}
-        req = urllib2.Request(self.url, self._get_post_data(), headers)
+        params_str = self._get_post_data()
+        if params_str is not None:
+            method = "POST"
+            params = dict([(key, ",".join(value)) for key, value in urlparse.parse_qs(params_str).items()])
+        else:
+            method = "GET"
+            params = None
+        
+        if self._auth:
+            self._auth.apply_auth(self.url, method, headers, params)
+        
+        req = urllib2.Request(self.url, urllib.urlencode(params) , headers)
 
-        password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
-        password_mgr.add_password(None, self.url, self._username,
-                                  self._password)
-        handler = urllib2.HTTPBasicAuthHandler(password_mgr)
-        opener = urllib2.build_opener(handler)
+        #replace with auth
+        #password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
+        #password_mgr.add_password(None, self.url, self._username,
+        #                          self._password)
+        #handler = urllib2.HTTPBasicAuthHandler(password_mgr)
+        #handler = urllib2.HTTPHandler()
+        #opener = urllib2.build_opener(handler)
 
         try:
-            self._conn = opener.open(req)
+            #self._conn = opener.open(req)
+            self._conn = urllib2.urlopen(req)
         except urllib2.HTTPError, exception:
             if exception.code == 401:
                 raise AuthenticationError("Access denied")
@@ -149,7 +164,7 @@
                 raise
         except urllib2.URLError, exception:
             raise ConnectionError(exception.reason)
-
+        logging.debug("TweetStream._init_conn : connected : %s, params : %s, headers: %s " % (self.url, repr(params), repr(headers)))
         self.connected = True
         if not self.starttime:
             self.starttime = time.time()
@@ -166,6 +181,7 @@
         """Return the next available tweet. This call is blocking!"""
         while True:
             try:
+                data=None
                 if not self.connected:
                     self._init_conn()
 
@@ -176,6 +192,8 @@
                     self._rate_ts = time.time()
 
                 data = self._conn.readline()
+                logging.debug("TweetStream.next : data : %s" % repr(data))
+                
                 if data == "": # something is wrong
                     self.close()
                     raise ConnectionError("Got entry of length 0. Disconnected")
@@ -189,7 +207,7 @@
 
             except ValueError, e:
                 self.close()
-                raise ConnectionError("Got invalid data from twitter", details=data)
+                raise ConnectionError("Got invalid data from twitter " + str(e), details=data)
 
             except socket.error, e:
                 self.close()
@@ -210,9 +228,7 @@
     connecting goes down. Reconnecting, and waiting for reconnecting, is
     blocking.
 
-    :param username: See :TweetStream:
-
-    :param password: See :TweetStream:
+    :param auth: See :TweetStream:
 
     :keyword url: See :TweetStream:
 
@@ -227,13 +243,13 @@
 
     """
 
-    def __init__(self, username, password, url="sample",
+    def __init__(self, auth, url="sample",
                  reconnects=3, error_cb=None, retry_wait=5):
         self.max_reconnects = reconnects
         self.retry_wait = retry_wait
         self._reconnects = 0
         self._error_cb = error_cb
-        TweetStream.__init__(self, username, password, url=url)
+        TweetStream.__init__(self, auth, url=url)
 
     def next(self):
         while True:
@@ -256,9 +272,7 @@
 class FollowStream(TweetStream):
     """Stream class for getting tweets from followers.
 
-        :param user: See TweetStream
-
-        :param password: See TweetStream
+        :param auth: See TweetStream
 
         :param followees: Iterable containing user IDs to follow.
           ***Note:*** the user id in question is the numeric ID twitter uses,
@@ -268,9 +282,9 @@
           value is the "follow" endpoint.
     """
 
-    def __init__(self, user, password, followees, url="follow", **kwargs):
+    def __init__(self, auth, followees, url="follow", **kwargs):
         self.followees = followees
-        TweetStream.__init__(self, user, password, url=url, **kwargs)
+        TweetStream.__init__(self, auth, url=url, **kwargs)
 
     def _get_post_data(self):
         return urllib.urlencode({"follow": ",".join(map(str, self.followees))})
@@ -279,9 +293,7 @@
 class TrackStream(TweetStream):
     """Stream class for getting tweets relevant to keywords.
 
-        :param user: See TweetStream
-
-        :param password: See TweetStream
+        :param auth: See TweetStream
 
         :param keywords: Iterable containing keywords to look for
 
@@ -289,9 +301,9 @@
           value is the "track" endpoint.
     """
 
-    def __init__(self, user, password, keywords, url="track", **kwargs):
+    def __init__(self, auth, keywords, url="track", **kwargs):
         self.keywords = keywords
-        TweetStream.__init__(self, user, password, url=url, **kwargs)
+        TweetStream.__init__(self, auth, url=url, **kwargs)
 
     def _get_post_data(self):
         return urllib.urlencode({"track": ",".join(self.keywords)})