--- a/script/lib/tweetstream/tweetstream/__init__.py Tue Jan 18 18:25:18 2011 +0100
+++ b/script/lib/tweetstream/tweetstream/__init__.py Thu Jan 20 10:44:04 2011 +0100
@@ -6,11 +6,14 @@
__homepage__ = "http://bitbucket.org/runeh/tweetstream/"
__docformat__ = "restructuredtext"
+import anyjson
+import logging
+import socket
+import time
import urllib
import urllib2
-import socket
-import time
-import anyjson
+import urlparse
+socket._fileobject.default_bufsize = 0
"""
@@ -57,10 +60,9 @@
class TweetStream(object):
"""A network connection to Twitters streaming API
- :param username: Twitter username for the account accessing the API.
- :param password: Twitter password for the account accessing the API.
+ :param auth: Twitter authentication (user name/password - oauth)
- :keyword url: URL to connect to. This can be either an endopoint name,
+ :keyword url: URL to connect to. This can be either an endpoint name,
such as "sample", or a full URL. By default, the public "sample" url
is used. All known endpoints are defined in the :URLS: attribute
@@ -102,12 +104,11 @@
:attr: `USER_AGENT`.
"""
- def __init__(self, username, password, url="sample"):
+ def __init__(self, auth, url="sample"):
self._conn = None
self._rate_ts = None
self._rate_cnt = 0
- self._username = username
- self._password = password
+ self._auth = auth
self.rate_period = 10 # in seconds
self.connected = False
@@ -130,16 +131,30 @@
def _init_conn(self):
"""Open the connection to the twitter server"""
headers = {'User-Agent': self.user_agent}
- req = urllib2.Request(self.url, self._get_post_data(), headers)
+ params_str = self._get_post_data()
+ if params_str is not None:
+ method = "POST"
+ params = dict([(key, ",".join(value)) for key, value in urlparse.parse_qs(params_str).items()])
+ else:
+ method = "GET"
+ params = None
+
+ if self._auth:
+ self._auth.apply_auth(self.url, method, headers, params)
+
+ req = urllib2.Request(self.url, urllib.urlencode(params) , headers)
- password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
- password_mgr.add_password(None, self.url, self._username,
- self._password)
- handler = urllib2.HTTPBasicAuthHandler(password_mgr)
- opener = urllib2.build_opener(handler)
+ #replace with auth
+ #password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
+ #password_mgr.add_password(None, self.url, self._username,
+ # self._password)
+ #handler = urllib2.HTTPBasicAuthHandler(password_mgr)
+ #handler = urllib2.HTTPHandler()
+ #opener = urllib2.build_opener(handler)
try:
- self._conn = opener.open(req)
+ #self._conn = opener.open(req)
+ self._conn = urllib2.urlopen(req)
except urllib2.HTTPError, exception:
if exception.code == 401:
raise AuthenticationError("Access denied")
@@ -149,7 +164,7 @@
raise
except urllib2.URLError, exception:
raise ConnectionError(exception.reason)
-
+ logging.debug("TweetStream._init_conn : connected : %s, params : %s, headers: %s " % (self.url, repr(params), repr(headers)))
self.connected = True
if not self.starttime:
self.starttime = time.time()
@@ -166,6 +181,7 @@
"""Return the next available tweet. This call is blocking!"""
while True:
try:
+ data=None
if not self.connected:
self._init_conn()
@@ -176,6 +192,8 @@
self._rate_ts = time.time()
data = self._conn.readline()
+ logging.debug("TweetStream.next : data : %s" % repr(data))
+
if data == "": # something is wrong
self.close()
raise ConnectionError("Got entry of length 0. Disconnected")
@@ -189,7 +207,7 @@
except ValueError, e:
self.close()
- raise ConnectionError("Got invalid data from twitter", details=data)
+ raise ConnectionError("Got invalid data from twitter " + str(e), details=data)
except socket.error, e:
self.close()
@@ -210,9 +228,7 @@
connecting goes down. Reconnecting, and waiting for reconnecting, is
blocking.
- :param username: See :TweetStream:
-
- :param password: See :TweetStream:
+ :param auth: See :TweetStream:
:keyword url: See :TweetStream:
@@ -227,13 +243,13 @@
"""
- def __init__(self, username, password, url="sample",
+ def __init__(self, auth, url="sample",
reconnects=3, error_cb=None, retry_wait=5):
self.max_reconnects = reconnects
self.retry_wait = retry_wait
self._reconnects = 0
self._error_cb = error_cb
- TweetStream.__init__(self, username, password, url=url)
+ TweetStream.__init__(self, auth, url=url)
def next(self):
while True:
@@ -256,9 +272,7 @@
class FollowStream(TweetStream):
"""Stream class for getting tweets from followers.
- :param user: See TweetStream
-
- :param password: See TweetStream
+ :param auth: See TweetStream
:param followees: Iterable containing user IDs to follow.
***Note:*** the user id in question is the numeric ID twitter uses,
@@ -268,9 +282,9 @@
value is the "follow" endpoint.
"""
- def __init__(self, user, password, followees, url="follow", **kwargs):
+ def __init__(self, auth, followees, url="follow", **kwargs):
self.followees = followees
- TweetStream.__init__(self, user, password, url=url, **kwargs)
+ TweetStream.__init__(self, auth, url=url, **kwargs)
def _get_post_data(self):
return urllib.urlencode({"follow": ",".join(map(str, self.followees))})
@@ -279,9 +293,7 @@
class TrackStream(TweetStream):
"""Stream class for getting tweets relevant to keywords.
- :param user: See TweetStream
-
- :param password: See TweetStream
+ :param auth: See TweetStream
:param keywords: Iterable containing keywords to look for
@@ -289,9 +301,9 @@
value is the "track" endpoint.
"""
- def __init__(self, user, password, keywords, url="track", **kwargs):
+ def __init__(self, auth, keywords, url="track", **kwargs):
self.keywords = keywords
- TweetStream.__init__(self, user, password, url=url, **kwargs)
+ TweetStream.__init__(self, auth, url=url, **kwargs)
def _get_post_data(self):
return urllib.urlencode({"track": ",".join(self.keywords)})