diff -r 4daf47fcf792 -r 5d552b6a0e55 script/lib/tweetstream/tweetstream/__init__.py --- a/script/lib/tweetstream/tweetstream/__init__.py Tue Jan 18 18:25:18 2011 +0100 +++ b/script/lib/tweetstream/tweetstream/__init__.py Thu Jan 20 10:44:04 2011 +0100 @@ -6,11 +6,14 @@ __homepage__ = "http://bitbucket.org/runeh/tweetstream/" __docformat__ = "restructuredtext" +import anyjson +import logging +import socket +import time import urllib import urllib2 -import socket -import time -import anyjson +import urlparse +socket._fileobject.default_bufsize = 0 """ @@ -57,10 +60,9 @@ class TweetStream(object): """A network connection to Twitters streaming API - :param username: Twitter username for the account accessing the API. - :param password: Twitter password for the account accessing the API. + :param auth: Twitter authentication (user name/password - oauth) - :keyword url: URL to connect to. This can be either an endopoint name, + :keyword url: URL to connect to. This can be either an endpoint name, such as "sample", or a full URL. By default, the public "sample" url is used. All known endpoints are defined in the :URLS: attribute @@ -102,12 +104,11 @@ :attr: `USER_AGENT`. """ - def __init__(self, username, password, url="sample"): + def __init__(self, auth, url="sample"): self._conn = None self._rate_ts = None self._rate_cnt = 0 - self._username = username - self._password = password + self._auth = auth self.rate_period = 10 # in seconds self.connected = False @@ -130,16 +131,30 @@ def _init_conn(self): """Open the connection to the twitter server""" headers = {'User-Agent': self.user_agent} - req = urllib2.Request(self.url, self._get_post_data(), headers) + params_str = self._get_post_data() + if params_str is not None: + method = "POST" + params = dict([(key, ",".join(value)) for key, value in urlparse.parse_qs(params_str).items()]) + else: + method = "GET" + params = None + + if self._auth: + self._auth.apply_auth(self.url, method, headers, params) + + req = urllib2.Request(self.url, urllib.urlencode(params) , headers) - password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() - password_mgr.add_password(None, self.url, self._username, - self._password) - handler = urllib2.HTTPBasicAuthHandler(password_mgr) - opener = urllib2.build_opener(handler) + #replace with auth + #password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() + #password_mgr.add_password(None, self.url, self._username, + # self._password) + #handler = urllib2.HTTPBasicAuthHandler(password_mgr) + #handler = urllib2.HTTPHandler() + #opener = urllib2.build_opener(handler) try: - self._conn = opener.open(req) + #self._conn = opener.open(req) + self._conn = urllib2.urlopen(req) except urllib2.HTTPError, exception: if exception.code == 401: raise AuthenticationError("Access denied") @@ -149,7 +164,7 @@ raise except urllib2.URLError, exception: raise ConnectionError(exception.reason) - + logging.debug("TweetStream._init_conn : connected : %s, params : %s, headers: %s " % (self.url, repr(params), repr(headers))) self.connected = True if not self.starttime: self.starttime = time.time() @@ -166,6 +181,7 @@ """Return the next available tweet. This call is blocking!""" while True: try: + data=None if not self.connected: self._init_conn() @@ -176,6 +192,8 @@ self._rate_ts = time.time() data = self._conn.readline() + logging.debug("TweetStream.next : data : %s" % repr(data)) + if data == "": # something is wrong self.close() raise ConnectionError("Got entry of length 0. Disconnected") @@ -189,7 +207,7 @@ except ValueError, e: self.close() - raise ConnectionError("Got invalid data from twitter", details=data) + raise ConnectionError("Got invalid data from twitter " + str(e), details=data) except socket.error, e: self.close() @@ -210,9 +228,7 @@ connecting goes down. Reconnecting, and waiting for reconnecting, is blocking. - :param username: See :TweetStream: - - :param password: See :TweetStream: + :param auth: See :TweetStream: :keyword url: See :TweetStream: @@ -227,13 +243,13 @@ """ - def __init__(self, username, password, url="sample", + def __init__(self, auth, url="sample", reconnects=3, error_cb=None, retry_wait=5): self.max_reconnects = reconnects self.retry_wait = retry_wait self._reconnects = 0 self._error_cb = error_cb - TweetStream.__init__(self, username, password, url=url) + TweetStream.__init__(self, auth, url=url) def next(self): while True: @@ -256,9 +272,7 @@ class FollowStream(TweetStream): """Stream class for getting tweets from followers. - :param user: See TweetStream - - :param password: See TweetStream + :param auth: See TweetStream :param followees: Iterable containing user IDs to follow. ***Note:*** the user id in question is the numeric ID twitter uses, @@ -268,9 +282,9 @@ value is the "follow" endpoint. """ - def __init__(self, user, password, followees, url="follow", **kwargs): + def __init__(self, auth, followees, url="follow", **kwargs): self.followees = followees - TweetStream.__init__(self, user, password, url=url, **kwargs) + TweetStream.__init__(self, auth, url=url, **kwargs) def _get_post_data(self): return urllib.urlencode({"follow": ",".join(map(str, self.followees))}) @@ -279,9 +293,7 @@ class TrackStream(TweetStream): """Stream class for getting tweets relevant to keywords. - :param user: See TweetStream - - :param password: See TweetStream + :param auth: See TweetStream :param keywords: Iterable containing keywords to look for @@ -289,9 +301,9 @@ value is the "track" endpoint. """ - def __init__(self, user, password, keywords, url="track", **kwargs): + def __init__(self, auth, keywords, url="track", **kwargs): self.keywords = keywords - TweetStream.__init__(self, user, password, url=url, **kwargs) + TweetStream.__init__(self, auth, url=url, **kwargs) def _get_post_data(self): return urllib.urlencode({"track": ",".join(self.keywords)})