| author | Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com> |
| Fri, 01 Jul 2011 18:59:13 +0200 | |
| changeset 207 | 621fa6caec0c |
| parent 206 | 6d642d650470 (current diff) |
| parent 13 | 79b6e132e3d7 (diff) |
| child 208 | a1f49175effc |
--- a/script/lib/tweetstream/CHANGELOG Fri Jul 01 10:06:36 2011 +0200 +++ b/script/lib/tweetstream/CHANGELOG Fri Jul 01 18:59:13 2011 +0200 @@ -41,4 +41,14 @@ - Removed a spurious print statement left over from debugging - Introduced common base class for all tweetstream exceptions - - Make sure we raise a sensible error on 404. Include url in desc of that error \ No newline at end of file + - Make sure we raise a sensible error on 404. Include url in desc of that error + +0.3.6 + + - Added LocationStream class for filtering on location bounding boxes. + +1.0.0 + + - Changed API to match latest twitter endpoints. This adds SampleStream and + FilterStream and deprecates TweetStream, FollowStream, LocationStream, + TrackStream and ReconnectingTweetStream.
--- a/script/lib/tweetstream/README Fri Jul 01 10:06:36 2011 +0200 +++ b/script/lib/tweetstream/README Fri Jul 01 18:59:13 2011 +0200 @@ -7,19 +7,19 @@ Introduction ------------ -tweetstream provides a class, TweetStream, that can be used to get -tweets from Twitter's streaming API. An instance of the class can be used as -an iterator. In addition to fetching tweets, the object keeps track of -the number of tweets collected and the rate at which tweets are received. +tweetstream provides two classes, SampleStream and FollowStream, that can be +used to get tweets from Twitter's streaming API. An instance of one of the +classes can be used as an iterator. In addition to fetching tweets, the +object keeps track of the number of tweets collected and the rate at which +tweets are received. -Subclasses are available for accessing the "track" and "follow" streams -as well. - -There's also a ReconnectingTweetStream class that handles automatic -reconnecting. +SampleStream delivers a sample of all tweets. FilterStream delivers +tweets that match one or more criteria. Note that it's not possible +to get all tweets without access to the "firehose" stream, which +is not currently avaliable to the public. Twitter's documentation about the streaming API can be found here: -http://apiwiki.twitter.com/Streaming-API-Documentation . +http://dev.twitter.com/pages/streaming_api_methods . **Note** that the API is blocking. If for some reason data is not immediatly available, calls will block until enough data is available to yield a tweet. @@ -27,9 +27,9 @@ Examples -------- -Printing all incomming tweets: +Printing incoming tweets: ->>> stream = tweetstream.TweetStream("username", "password") +>>> stream = tweetstream.SampleStream("username", "password") >>> for tweet in stream: ... print tweet @@ -37,7 +37,7 @@ The stream object can also be used as a context, as in this example that prints the author for each tweet as well as the tweet count and rate: ->>> with tweetstream.TweetStream("username", "password") as stream +>>> with tweetstream.SampleStream("username", "password") as stream ... for tweet in stream: ... print "Got tweet from %-16s\t( tweet %d, rate %.1f tweets/sec)" % ( ... tweet["user"]["screen_name"], stream.count, stream.rate ) @@ -53,36 +53,35 @@ ... except tweetstream.ConnectionError, e: ... print "Disconnected from twitter. Reason:", e.reason -To get tweets that relate to specific terms, use the TrackStream: +To get tweets that match specific criteria, use the FilterStream. FilterStreams +take three keyword arguments: "locations", "follow" and "track". + +Locations are a list of bounding boxes in which geotagged tweets should originate. +The argument should be an iterable of longitude/latitude pairs. + +Track specifies keywords to track. The argument should be an iterable of +strings. + +Follow returns statuses that reference given users. Argument should be an iterable +of twitter user IDs. The IDs are userid ints, not the screen names. >>> words = ["opera", "firefox", "safari"] ->>> with tweetstream.TrackStream("username", "password", words) as stream +>>> people = [123,124,125] +>>> locations = ["-122.75,36.8", "-121.75,37.8"] +>>> with tweetstream.FilterStream("username", "password", track=words, +... follow=people, locations=locations) as stream ... for tweet in stream: ... print "Got interesting tweet:", tweet -To get only tweets from a set of users, use the FollowStream. The following -would get tweets for user 1, 42 and 8675309 ->>> users = [1, 42, 8675309] ->>> with tweetstream.FollowStream("username", "password", users) as stream -... for tweet in stream: -... print "Got tweet from:", tweet["user"]["screen_name"] - - -Simple tweet fetcher that sends tweets to an AMQP message server using carrot: +Deprecated classes +------------------ ->>> from carrot.messaging import Publisher ->>> from carrot.connection import AMQPConnection ->>> from tweetstream import TweetStream ->>> amqpconn = AMQPConnection(hostname="localhost", port=5672, -... userid="test", password="test", -... vhost="test") ->>> publisher = Publisher(connection=amqpconn, -... exchange="tweets", routing_key="stream") ->>> with TweetStream("username", "password") as stream: -... for tweet in stream: -... publisher.send(tweet) ->>> publisher.close() +tweetstream used to contain the classes TweetStream, FollowStream, TrackStream +LocationStream and ReconnectingTweetStream. These were deprecated when twitter +changed its API end points. The same functionality is now available in +SampleStream and FilterStream. The deprecated methods will emit a warning when +used, but will remain functional for a while longer. Changelog @@ -98,6 +97,12 @@ requests, please report them in the project site issue tracker. Patches are also very welcome. +Contributors +------------ + +- Rune Halvorsen +- Christopher Schierkolk + License -------
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/tweetstream/conftest.py Fri Jul 01 18:59:13 2011 +0200 @@ -0,0 +1,10 @@ +# content of conftest.py + +import pytest +def pytest_addoption(parser): + parser.addoption("--runslow", action="store_true", + help="run slow tests") + +def pytest_runtest_setup(item): + if 'slow' in item.keywords and not item.config.getvalue("runslow"): + pytest.skip("need --runslow option to run") \ No newline at end of file
--- a/script/lib/tweetstream/setup.py Fri Jul 01 10:06:36 2011 +0200 +++ b/script/lib/tweetstream/setup.py Fri Jul 01 18:59:13 2011 +0200 @@ -3,7 +3,7 @@ author = "Rune Halvorsen" email = "runefh@gmail.com" -version = "0.3.5" +version = "1.0.0" homepage = "http://bitbucket.org/runeh/tweetstream/" setup(name='tweetstream',
--- a/script/lib/tweetstream/tests/test_tweetstream.py Fri Jul 01 10:06:36 2011 +0200 +++ b/script/lib/tweetstream/tests/test_tweetstream.py Fri Jul 01 18:59:13 2011 +0200 @@ -3,53 +3,65 @@ import time from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer -from nose.tools import assert_raises -from tweetstream import TweetStream, FollowStream, TrackStream -from tweetstream import ConnectionError, AuthenticationError -from tweetstream import auth +from tweetstream import TweetStream, FollowStream, TrackStream, LocationStream +from tweetstream import ConnectionError, AuthenticationError, SampleStream +from tweepy.auth import BasicAuthHandler + +import pytest +from pytest import raises +slow = pytest.mark.slow from servercontext import test_server single_tweet = r"""{"in_reply_to_status_id":null,"in_reply_to_user_id":null,"favorited":false,"created_at":"Tue Jun 16 10:40:14 +0000 2009","in_reply_to_screen_name":null,"text":"record industry just keeps on amazing me: http:\/\/is.gd\/13lFo - $150k per song you've SHARED, not that somebody has actually DOWNLOADED.","user":{"notifications":null,"profile_background_tile":false,"followers_count":206,"time_zone":"Copenhagen","utc_offset":3600,"friends_count":191,"profile_background_color":"ffffff","profile_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_images\/250715794\/profile_normal.png","description":"Digital product developer, currently at Opera Software. My tweets are my opinions, not those of my employer.","verified_profile":false,"protected":false,"favourites_count":0,"profile_text_color":"3C3940","screen_name":"eiriksnilsen","name":"Eirik Stridsklev N.","following":null,"created_at":"Tue May 06 12:24:12 +0000 2008","profile_background_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_background_images\/10531192\/160x600opera15.gif","profile_link_color":"0099B9","profile_sidebar_fill_color":"95E8EC","url":"http:\/\/www.stridsklev-nilsen.no\/eirik","id":14672543,"statuses_count":506,"profile_sidebar_border_color":"5ED4DC","location":"Oslo, Norway"},"id":2190767504,"truncated":false,"source":"<a href=\"http:\/\/widgets.opera.com\/widget\/7206\">Twitter Opera widget<\/a>"}""" -def test_bad_auth(): +def parameterized(funcarglist): + def wrapper(function): + function.funcarglist = funcarglist + return function + return wrapper + +def pytest_generate_tests(metafunc): + for funcargs in getattr(metafunc.function, 'funcarglist', ()): + metafunc.addcall(funcargs=funcargs) + + +streamtypes = [ + dict(cls=TweetStream, args=[], kwargs=dict()), + dict(cls=SampleStream, args=[], kwargs=dict()), + dict(cls=FollowStream, args=[[1, 2, 3]], kwargs=dict()), + dict(cls=TrackStream, args=["opera"], kwargs=dict()), + dict(cls=LocationStream, args=["123,4321"], kwargs=dict()) +] + + +@parameterized(streamtypes) +def test_bad_auth(cls, args, kwargs): """Test that the proper exception is raised when the user could not be authenticated""" def auth_denied(request): request.send_error(401) - with test_server(handler=auth_denied, methods=("post", "get"), - port="random") as server: - stream = TweetStream(auth.BasicAuthHandler("foo", "bar"), url=server.baseurl) - assert_raises(AuthenticationError, stream.next) - - stream = FollowStream(auth.BasicAuthHandler("foo", "bar"), [1, 2, 3], url=server.baseurl) - assert_raises(AuthenticationError, stream.next) - - stream = TrackStream(auth.BasicAuthHandler("foo", "bar"), ["opera"], url=server.baseurl) - assert_raises(AuthenticationError, stream.next) + with test_server(handler=auth_denied, methods=("post", "get"), port="random") as server: + auth = BasicAuthHandler("user", "passwd") + stream = cls(auth, *args, url=server.baseurl) -def test_404_url(): +@parameterized(streamtypes) +def test_404_url(cls, args, kwargs): """Test that the proper exception is raised when the stream URL can't be found""" def not_found(request): request.send_error(404) - with test_server(handler=not_found, methods=("post", "get"), - port="random") as server: - stream = TweetStream(auth.BasicAuthHandler("foo", "bar"), url=server.baseurl) - assert_raises(ConnectionError, stream.next) - - stream = FollowStream(auth.BasicAuthHandler("foo", "bar"), [1, 2, 3], url=server.baseurl) - assert_raises(ConnectionError, stream.next) - - stream = TrackStream(auth.BasicAuthHandler("foo", "bar"), ["opera"], url=server.baseurl) - assert_raises(ConnectionError, stream.next) + with test_server(handler=not_found, methods=("post", "get"), port="random") as server: + auth = BasicAuthHandler("user", "passwd") + stream = cls(auth, *args, url=server.baseurl) -def test_bad_content(): +@parameterized(streamtypes) +def test_bad_content(cls, args, kwargs): """Test error handling if we are given invalid data""" def bad_content(request): for n in xrange(10): @@ -59,19 +71,17 @@ yield "[1,2, I need no stinking close brace" yield "[1,2,3]" - def do_test(klass, *args): - with test_server(handler=bad_content, methods=("post", "get"), - port="random") as server: - stream = klass(auth.BasicAuthHandler("foo", "bar"), *args, url=server.baseurl) + + with raises(ConnectionError): + with test_server(handler=bad_content, methods=("post", "get"), port="random") as server: + auth = BasicAuthHandler("user", "passwd") + stream = cls(auth, *args, url=server.baseurl) for tweet in stream: pass - assert_raises(ConnectionError, do_test, TweetStream) - assert_raises(ConnectionError, do_test, FollowStream, [1, 2, 3]) - assert_raises(ConnectionError, do_test, TrackStream, ["opera"]) - -def test_closed_connection(): +@parameterized(streamtypes) +def test_closed_connection(cls, args, kwargs): """Test error handling if server unexpectedly closes connection""" cnt = 1000 def bad_content(request): @@ -80,31 +90,24 @@ # strcuture, only checking that it's parsable yield "[1,2,3]" - def do_test(klass, *args): - with test_server(handler=bad_content, methods=("post", "get"), - port="random") as server: - stream = klass(auth.BasicAuthHandler("foo", "bar"), *args, url=server.baseurl) + with raises(ConnectionError): + with test_server(handler=bad_content, methods=("post", "get"), port="random") as server: + auth = BasicAuthHandler("foo", "bar") + stream = cls(auth, *args, url=server.baseurl) for tweet in stream: pass - assert_raises(ConnectionError, do_test, TweetStream) - assert_raises(ConnectionError, do_test, FollowStream, [1, 2, 3]) - assert_raises(ConnectionError, do_test, TrackStream, ["opera"]) + +@parameterized(streamtypes) +def test_bad_host(cls, args, kwargs): + """Test behaviour if we can't connect to the host""" + with raises(ConnectionError): + stream = cls("username", "passwd", *args, url="http://wedfwecfghhreewerewads.foo") + stream.next() -def test_bad_host(): - """Test behaviour if we can't connect to the host""" - stream = TweetStream(auth.BasicAuthHandler("foo", "bar"), url="http://bad.egewdvsdswefdsf.com/") - assert_raises(ConnectionError, stream.next) - - stream = FollowStream(auth.BasicAuthHandler("foo", "bar"), [1, 2, 3], url="http://zegwefdsf.com/") - assert_raises(ConnectionError, stream.next) - - stream = TrackStream(auth.BasicAuthHandler("foo", "bar"), ["foo"], url="http://aswefdsews.com/") - assert_raises(ConnectionError, stream.next) - - -def smoke_test_receive_tweets(): +@parameterized(streamtypes) +def smoke_test_receive_tweets(cls, args, kwargs): """Receive 100k tweets and disconnect (slow)""" total = 100000 @@ -112,20 +115,16 @@ while True: yield single_tweet + "\n" - def do_test(klass, *args): - with test_server(handler=tweetsource, - methods=("post", "get"), port="random") as server: - stream = klass(auth.BasicAuthHandler("foo", "bar"), *args, url=server.baseurl) - for tweet in stream: - if stream.count == total: - break - - do_test(TweetStream) - do_test(FollowStream, [1, 2, 3]) - do_test(TrackStream, ["foo", "bar"]) + with test_server(handler=tweetsource, methods=("post", "get"), port="random") as server: + auth = BasicAuthHandler("foo", "bar") + stream = cls(auth, *args, url=server.baseurl) + for tweet in stream: + if stream.count == total: + break -def test_keepalive(): +@parameterized(streamtypes) +def test_keepalive(cls, args, kwargs): """Make sure we behave sanely when there are keepalive newlines in the data recevived from twitter""" def tweetsource(request): @@ -143,25 +142,22 @@ yield single_tweet+"\n" yield "\n" - def do_test(klass, *args): - with test_server(handler=tweetsource, methods=("post", "get"), - port="random") as server: - stream = klass(auth.BasicAuthHandler("foo", "bar"), *args, url=server.baseurl) - try: - for tweet in stream: - pass - except ConnectionError: - assert stream.count == 3, "Got %s, wanted 3" % stream.count - else: - assert False, "Didn't handle keepalive" + + with test_server(handler=tweetsource, methods=("post", "get"), port="random") as server: + auth = BasicAuthHandler("foo", "bar") + stream = cls(auth, *args, url=server.baseurl) + try: + for tweet in stream: + pass + except ConnectionError: + assert stream.count == 3, "Got %s, wanted 3" % stream.count + else: + assert False, "Didn't handle keepalive" - do_test(TweetStream) - do_test(FollowStream, [1, 2, 3]) - do_test(TrackStream, ["foo", "bar"]) - - -def test_buffering(): +@slow +@parameterized(streamtypes) +def test_buffering(cls, args, kwargs): """Test if buffering stops data from being returned immediately. If there is some buffering in play that might mean data is only returned from the generator when the buffer is full. If buffer is bigger than a @@ -176,19 +172,13 @@ for n in xrange(100): yield single_tweet+"\n" - def do_test(klass, *args): - with test_server(handler=tweetsource, methods=("post", "get"), - port="random") as server: - stream = klass(auth.BasicAuthHandler("foo", "bar"), *args, url=server.baseurl) - start = time.time() - stream.next() - first = time.time() - diff = first - start - assert diff < 1, "Getting first tweet took more than a second!" + with test_server(handler=tweetsource, methods=("post", "get"), port="random") as server: + auth = BasicAuthHandler("foo", "bar") + stream = cls(auth, *args, url=server.baseurl) + start = time.time() + stream.next() + first = time.time() + diff = first - start + assert diff < 1, "Getting first tweet took more than a second!" - do_test(TweetStream) - do_test(FollowStream, [1, 2, 3]) - do_test(TrackStream, ["foo", "bar"]) - -
--- a/script/lib/tweetstream/tweetstream/__init__.py Fri Jul 01 10:06:36 2011 +0200 +++ b/script/lib/tweetstream/tweetstream/__init__.py Fri Jul 01 18:59:13 2011 +0200 @@ -1,37 +1,18 @@ """ Simple Twitter streaming API access """ -__version__ = "0.3.5" +__version__ = "1.0.0" __author__ = "Rune Halvorsen <runefh@gmail.com>" __homepage__ = "http://bitbucket.org/runeh/tweetstream/" __docformat__ = "restructuredtext" -import anyjson -import logging -import socket -import time -import urllib -import urllib2 -import urlparse -socket._fileobject.default_bufsize = 0 - """ - .. data:: URLS - - Mapping between twitter endpoint names and URLs. - .. data:: USER_AGENT The default user agent string for stream objects - """ -URLS = {"firehose": "http://stream.twitter.com/1/statuses/firehose.json", - "sample": "http://stream.twitter.com/1/statuses/sample.json", - "follow": "http://stream.twitter.com/1/statuses/filter.json", - "track": "http://stream.twitter.com/1/statuses/filter.json"} - USER_AGENT = "TweetStream %s" % __version__ @@ -39,9 +20,9 @@ """Base class for all tweetstream errors""" pass + class AuthenticationError(TweetStreamError): - """Exception raised if the username/password is not accepted - """ + """Exception raised if the username/password is not accepted""" pass @@ -57,244 +38,5 @@ return '<ConnectionError %s>' % self.reason -class TweetStream(object): - """A network connection to Twitters streaming API - - :param auth: Twitter authentication (user name/password - oauth) - - :keyword url: URL to connect to. This can be either an endpoint name, - such as "sample", or a full URL. By default, the public "sample" url - is used. All known endpoints are defined in the :URLS: attribute - - .. attribute:: connected - - True if the object is currently connected to the stream. - - .. attribute:: url - - The URL to which the object is connected - - .. attribute:: starttime - - The timestamp, in seconds since the epoch, the object connected to the - streaming api. - - .. attribute:: count - - The number of tweets that have been returned by the object. - - .. attribute:: rate - - The rate at which tweets have been returned from the object as a - float. see also :attr: `rate_period`. - - .. attribute:: rate_period - - The ammount of time to sample tweets to calculate tweet rate. By - default 10 seconds. Changes to this attribute will not be reflected - until the next time the rate is calculated. The rate of tweets vary - with time of day etc. so it's usefull to set this to something - sensible. - - .. attribute:: user_agent - - User agent string that will be included in the request. NOTE: This can - not be changed after the connection has been made. This property must - thus be set before accessing the iterator. The default is set in - :attr: `USER_AGENT`. -""" - - def __init__(self, auth, url="sample"): - self._conn = None - self._rate_ts = None - self._rate_cnt = 0 - self._auth = auth - - self.rate_period = 10 # in seconds - self.connected = False - self.starttime = None - self.count = 0 - self.rate = 0 - self.user_agent = USER_AGENT - self.url = URLS.get(url, url) - - def __iter__(self): - return self - - def __enter__(self): - return self - - def __exit__(self, *params): - self.close() - return False - - def _init_conn(self): - """Open the connection to the twitter server""" - headers = {'User-Agent': self.user_agent} - params_str = self._get_post_data() - if params_str is not None: - method = "POST" - params = dict([(key, ",".join(value)) for key, value in urlparse.parse_qs(params_str).items()]) - else: - method = "GET" - params = None - - if self._auth: - self._auth.apply_auth(self.url, method, headers, params) - - req = urllib2.Request(self.url, urllib.urlencode(params) , headers) - - try: - self._conn = urllib2.urlopen(req) - except urllib2.HTTPError, exception: - if exception.code == 401: - raise AuthenticationError("Access denied") - elif exception.code == 404: - raise ConnectionError("URL not found: %s" % self.url) - else: # re raise. No idea what would cause this, so want to know - raise - except urllib2.URLError, exception: - raise ConnectionError(exception.reason) - logging.debug("TweetStream._init_conn : connected : %s, params : %s, headers: %s " % (self.url, repr(params), repr(headers))) - self.connected = True - if not self.starttime: - self.starttime = time.time() - if not self._rate_ts: - self._rate_ts = time.time() - - def _get_post_data(self): - """Subclasses that need to add post data to the request can override - this method and return post data. The data should be in the format - returned by urllib.urlencode.""" - return None - - def next(self): - """Return the next available tweet. This call is blocking!""" - while True: - try: - data=None - if not self.connected: - self._init_conn() - - rate_time = time.time() - self._rate_ts - if not self._rate_ts or rate_time > self.rate_period: - self.rate = self._rate_cnt / rate_time - self._rate_cnt = 0 - self._rate_ts = time.time() - - data = self._conn.readline() - logging.debug("TweetStream.next : data : %s" % repr(data)) - - if data == "": # something is wrong - self.close() - raise ConnectionError("Got entry of length 0. Disconnected") - elif data.isspace(): - continue - - data = anyjson.deserialize(data) - self.count += 1 - self._rate_cnt += 1 - return data - - except ValueError, e: - self.close() - raise ConnectionError("Got invalid data from twitter " + str(e), details=data) - - except socket.error, e: - self.close() - raise ConnectionError("Server disconnected") - - - def close(self): - """ - Close the connection to the streaming server. - """ - self.connected = False - if self._conn: - self._conn.close() - - -class ReconnectingTweetStream(TweetStream): - """TweetStream class that automatically tries to reconnect if the - connecting goes down. Reconnecting, and waiting for reconnecting, is - blocking. - - :param auth: See :TweetStream: - - :keyword url: See :TweetStream: - - :keyword reconnects: Number of reconnects before a ConnectionError is - raised. Default is 3 - - :error_cb: Optional callable that will be called just before trying to - reconnect. The callback will be called with a single argument, the - exception that caused the reconnect attempt. Default is None - - :retry_wait: Time to wait before reconnecting in seconds. Default is 5 - - """ - - def __init__(self, auth, url="sample", - reconnects=3, error_cb=None, retry_wait=5): - self.max_reconnects = reconnects - self.retry_wait = retry_wait - self._reconnects = 0 - self._error_cb = error_cb - TweetStream.__init__(self, auth, url=url) - - def next(self): - while True: - try: - return TweetStream.next(self) - except ConnectionError, e: - self._reconnects += 1 - if self._reconnects > self.max_reconnects: - raise ConnectionError("Too many retries") - - # Note: error_cb is not called on the last error since we - # raise a ConnectionError instead - if callable(self._error_cb): - self._error_cb(e) - - time.sleep(self.retry_wait) - # Don't listen to auth error, since we can't reasonably reconnect - # when we get one. - -class FollowStream(TweetStream): - """Stream class for getting tweets from followers. - - :param auth: See TweetStream - - :param followees: Iterable containing user IDs to follow. - ***Note:*** the user id in question is the numeric ID twitter uses, - not the normal username. - - :keyword url: Like the url argument to TweetStream, except default - value is the "follow" endpoint. - """ - - def __init__(self, auth, followees, url="follow", **kwargs): - self.followees = followees - TweetStream.__init__(self, auth, url=url, **kwargs) - - def _get_post_data(self): - return urllib.urlencode({"follow": ",".join(map(str, self.followees))}) - - -class TrackStream(TweetStream): - """Stream class for getting tweets relevant to keywords. - - :param auth: See TweetStream - - :param keywords: Iterable containing keywords to look for - - :keyword url: Like the url argument to TweetStream, except default - value is the "track" endpoint. - """ - - def __init__(self, auth, keywords, url="track", **kwargs): - self.keywords = keywords - TweetStream.__init__(self, auth, url=url, **kwargs) - - def _get_post_data(self): - return urllib.urlencode({"track": ",".join(self.keywords)}) +from streamclasses import SampleStream, FilterStream +from deprecated import FollowStream, TrackStream, LocationStream, TweetStream, ReconnectingTweetStream
--- a/script/lib/tweetstream/tweetstream/auth.py Fri Jul 01 10:06:36 2011 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,162 +0,0 @@ -# Tweepy -# Copyright 2009-2010 Joshua Roesslein -# See LICENSE for details. - -from tweetstream import TweetStreamError -from urllib2 import Request, urlopen -import base64 -import tweetstream.oauth - - - -class AuthHandler(object): - - def apply_auth(self, url, method, headers, parameters): - """Apply authentication headers to request""" - raise NotImplementedError - - def get_username(self): - """Return the username of the authenticated user""" - raise NotImplementedError - - -class BasicAuthHandler(AuthHandler): - - def __init__(self, username, password): - self.username = username - self._b64up = base64.b64encode('%s:%s' % (username, password)) - - def apply_auth(self, url, method, headers, parameters): - headers['Authorization'] = 'Basic %s' % self._b64up - - def get_username(self): - return self.username - - -class OAuthHandler(AuthHandler): - """OAuth authentication handler""" - - OAUTH_HOST = 'api.twitter.com' - OAUTH_ROOT = '/oauth/' - - def __init__(self, consumer_key, consumer_secret, callback=None, secure=False): - self._consumer = tweetstream.oauth.OAuthConsumer(consumer_key, consumer_secret) - self._sigmethod = tweetstream.oauth.OAuthSignatureMethod_HMAC_SHA1() - self.request_token = None - self.access_token = None - self.callback = callback - self.username = None - self.secure = secure - - def _get_oauth_url(self, endpoint, secure=False): - if self.secure or secure: - prefix = 'https://' - else: - prefix = 'http://' - - return prefix + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint - - def apply_auth(self, url, method, headers, parameters): - request = tweetstream.oauth.OAuthRequest.from_consumer_and_token( - self._consumer, http_url=url, http_method=method, - token=self.access_token, parameters=parameters - ) - request.sign_request(self._sigmethod, self._consumer, self.access_token) - headers.update(request.to_header()) - - def _get_request_token(self): - try: - url = self._get_oauth_url('request_token') - request = tweetstream.oauth.OAuthRequest.from_consumer_and_token( - self._consumer, http_url=url, callback=self.callback - ) - request.sign_request(self._sigmethod, self._consumer, None) - resp = urlopen(Request(url, headers=request.to_header())) - return tweetstream.oauth.OAuthToken.from_string(resp.read()) - except Exception, e: - raise TweetStreamError(e) - - def set_request_token(self, key, secret): - self.request_token = tweetstream.oauth.OAuthToken(key, secret) - - def set_access_token(self, key, secret): - self.access_token = tweetstream.oauth.OAuthToken(key, secret) - - def get_authorization_url(self, signin_with_twitter=False): - """Get the authorization URL to redirect the user""" - try: - # get the request token - self.request_token = self._get_request_token() - - # build auth request and return as url - if signin_with_twitter: - url = self._get_oauth_url('authenticate') - else: - url = self._get_oauth_url('authorize') - request = tweetstream.oauth.OAuthRequest.from_token_and_callback( - token=self.request_token, http_url=url - ) - - return request.to_url() - except Exception, e: - raise TweetStreamError(e) - - def get_access_token(self, verifier=None): - """ - After user has authorized the request token, get access token - with user supplied verifier. - """ - try: - url = self._get_oauth_url('access_token') - - # build request - request = tweetstream.oauth.OAuthRequest.from_consumer_and_token( - self._consumer, - token=self.request_token, http_url=url, - verifier=str(verifier) - ) - request.sign_request(self._sigmethod, self._consumer, self.request_token) - - # send request - resp = urlopen(Request(url, headers=request.to_header())) - self.access_token = tweetstream.oauth.OAuthToken.from_string(resp.read()) - return self.access_token - except Exception, e: - raise TweetStreamError(e) - - def get_xauth_access_token(self, username, password): - """ - Get an access token from an username and password combination. - In order to get this working you need to create an app at - http://twitter.com/apps, after that send a mail to api@twitter.com - and request activation of xAuth for it. - """ - try: - url = self._get_oauth_url('access_token', secure=True) # must use HTTPS - request = tweetstream.oauth.OAuthRequest.from_consumer_and_token( - oauth_consumer=self._consumer, - http_method='POST', http_url=url, - parameters = { - 'x_auth_mode': 'client_auth', - 'x_auth_username': username, - 'x_auth_password': password - } - ) - request.sign_request(self._sigmethod, self._consumer, None) - - resp = urlopen(Request(url, data=request.to_postdata())) - self.access_token = tweetstream.oauth.OAuthToken.from_string(resp.read()) - return self.access_token - except Exception, e: - raise TweetStreamError(e) - - def get_username(self): - if self.username is None: - api = API(self) - user = api.verify_credentials() - if user: - self.username = user.screen_name - else: - raise TweetStreamError("Unable to get username, invalid oauth token!") - return self.username -
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/tweetstream/tweetstream/deprecated.py Fri Jul 01 18:59:13 2011 +0200 @@ -0,0 +1,82 @@ +from .streamclasses import FilterStream, SampleStream, ConnectionError +import time + +class DeprecatedStream(FilterStream): + def __init__(self, *args, **kwargs): + import warnings + warnings.warn("%s is deprecated. Use FilterStream instead" % self.__class__.__name__, DeprecationWarning) + super(DeprecatedStream, self).__init__(*args, **kwargs) + + +class FollowStream(DeprecatedStream): + def __init__(self, auth, follow, catchup=None, url=None): + super(FollowStream, self).__init__(auth, follow=follow, catchup=catchup, url=url) + + +class TrackStream(DeprecatedStream): + def __init__(self, auth, track, catchup=None, url=None): + super(TrackStream, self).__init__(auth, track=track, catchup=catchup, url=url) + + +class LocationStream(DeprecatedStream): + def __init__(self, auth, locations, catchup=None, url=None): + super(LocationStream, self).__init__(auth, locations=locations, catchup=catchup, url=url) + + +class TweetStream(SampleStream): + def __init__(self, *args, **kwargs): + import warnings + warnings.warn("%s is deprecated. Use SampleStream instead" % self.__class__.__name__, DeprecationWarning) + SampleStream.__init__(self, *args, **kwargs) + + +class ReconnectingTweetStream(TweetStream): + """TweetStream class that automatically tries to reconnect if the + connecting goes down. Reconnecting, and waiting for reconnecting, is + blocking. + + :param username: See :TweetStream: + + :param password: See :TweetStream: + + :keyword url: See :TweetStream: + + :keyword reconnects: Number of reconnects before a ConnectionError is + raised. Default is 3 + + :error_cb: Optional callable that will be called just before trying to + reconnect. The callback will be called with a single argument, the + exception that caused the reconnect attempt. Default is None + + :retry_wait: Time to wait before reconnecting in seconds. Default is 5 + + """ + + def __init__(self, auth, url="sample", + reconnects=3, error_cb=None, retry_wait=5): + self.max_reconnects = reconnects + self.retry_wait = retry_wait + self._reconnects = 0 + self._error_cb = error_cb + TweetStream.__init__(self, auth, url=url) + + def next(self): + while True: + try: + return TweetStream.next(self) + except ConnectionError, e: + self._reconnects += 1 + if self._reconnects > self.max_reconnects: + raise ConnectionError("Too many retries") + + # Note: error_cb is not called on the last error since we + # raise a ConnectionError instead + if callable(self._error_cb): + self._error_cb(e) + + time.sleep(self.retry_wait) + # Don't listen to auth error, since we can't reasonably reconnect + # when we get one. + + +
--- a/script/lib/tweetstream/tweetstream/oauth.py Fri Jul 01 10:06:36 2011 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,655 +0,0 @@ -""" -The MIT License - -Copyright (c) 2007 Leah Culver - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. -""" - -import binascii -import cgi -import hmac -import random -import time -import urllib -import urlparse - - -VERSION = '1.0' # Hi Blaine! -HTTP_METHOD = 'GET' -SIGNATURE_METHOD = 'PLAINTEXT' - - -class OAuthError(RuntimeError): - """Generic exception class.""" - def __init__(self, message='OAuth error occured.'): - self.message = message - -def build_authenticate_header(realm=''): - """Optional WWW-Authenticate header (401 error)""" - return {'WWW-Authenticate': 'OAuth realm="%s"' % realm} - -def escape(s): - """Escape a URL including any /.""" - return urllib.quote(s, safe='~') - -def _utf8_str(s): - """Convert unicode to utf-8.""" - if isinstance(s, unicode): - return s.encode("utf-8") - else: - return str(s) - -def generate_timestamp(): - """Get seconds since epoch (UTC).""" - return int(time.time()) - -def generate_nonce(length=8): - """Generate pseudorandom number.""" - return ''.join([str(random.randint(0, 9)) for i in range(length)]) - -def generate_verifier(length=8): - """Generate pseudorandom number.""" - return ''.join([str(random.randint(0, 9)) for i in range(length)]) - - -class OAuthConsumer(object): - """Consumer of OAuth authentication. - - OAuthConsumer is a data type that represents the identity of the Consumer - via its shared secret with the Service Provider. - - """ - key = None - secret = None - - def __init__(self, key, secret): - self.key = key - self.secret = secret - - -class OAuthToken(object): - """OAuthToken is a data type that represents an End User via either an access - or request token. - - key -- the token - secret -- the token secret - - """ - key = None - secret = None - callback = None - callback_confirmed = None - verifier = None - - def __init__(self, key, secret): - self.key = key - self.secret = secret - - def set_callback(self, callback): - self.callback = callback - self.callback_confirmed = 'true' - - def set_verifier(self, verifier=None): - if verifier is not None: - self.verifier = verifier - else: - self.verifier = generate_verifier() - - def get_callback_url(self): - if self.callback and self.verifier: - # Append the oauth_verifier. - parts = urlparse.urlparse(self.callback) - scheme, netloc, path, params, query, fragment = parts[:6] - if query: - query = '%s&oauth_verifier=%s' % (query, self.verifier) - else: - query = 'oauth_verifier=%s' % self.verifier - return urlparse.urlunparse((scheme, netloc, path, params, - query, fragment)) - return self.callback - - def to_string(self): - data = { - 'oauth_token': self.key, - 'oauth_token_secret': self.secret, - } - if self.callback_confirmed is not None: - data['oauth_callback_confirmed'] = self.callback_confirmed - return urllib.urlencode(data) - - def from_string(s): - """ Returns a token from something like: - oauth_token_secret=xxx&oauth_token=xxx - """ - params = cgi.parse_qs(s, keep_blank_values=False) - key = params['oauth_token'][0] - secret = params['oauth_token_secret'][0] - token = OAuthToken(key, secret) - try: - token.callback_confirmed = params['oauth_callback_confirmed'][0] - except KeyError: - pass # 1.0, no callback confirmed. - return token - from_string = staticmethod(from_string) - - def __str__(self): - return self.to_string() - - -class OAuthRequest(object): - """OAuthRequest represents the request and can be serialized. - - OAuth parameters: - - oauth_consumer_key - - oauth_token - - oauth_signature_method - - oauth_signature - - oauth_timestamp - - oauth_nonce - - oauth_version - - oauth_verifier - ... any additional parameters, as defined by the Service Provider. - """ - parameters = None # OAuth parameters. - http_method = HTTP_METHOD - http_url = None - version = VERSION - - def __init__(self, http_method=HTTP_METHOD, http_url=None, parameters=None): - self.http_method = http_method - self.http_url = http_url - self.parameters = parameters or {} - - def set_parameter(self, parameter, value): - self.parameters[parameter] = value - - def get_parameter(self, parameter): - try: - return self.parameters[parameter] - except: - raise OAuthError('Parameter not found: %s' % parameter) - - def _get_timestamp_nonce(self): - return self.get_parameter('oauth_timestamp'), self.get_parameter( - 'oauth_nonce') - - def get_nonoauth_parameters(self): - """Get any non-OAuth parameters.""" - parameters = {} - for k, v in self.parameters.iteritems(): - # Ignore oauth parameters. - if k.find('oauth_') < 0: - parameters[k] = v - return parameters - - def to_header(self, realm=''): - """Serialize as a header for an HTTPAuth request.""" - auth_header = 'OAuth realm="%s"' % realm - # Add the oauth parameters. - if self.parameters: - for k, v in self.parameters.iteritems(): - if k[:6] == 'oauth_': - auth_header += ', %s="%s"' % (k, escape(str(v))) - return {'Authorization': auth_header} - - def to_postdata(self): - """Serialize as post data for a POST request.""" - return '&'.join(['%s=%s' % (escape(str(k)), escape(str(v))) \ - for k, v in self.parameters.iteritems()]) - - def to_url(self): - """Serialize as a URL for a GET request.""" - return '%s?%s' % (self.get_normalized_http_url(), self.to_postdata()) - - def get_normalized_parameters(self): - """Return a string that contains the parameters that must be signed.""" - params = self.parameters - try: - # Exclude the signature if it exists. - del params['oauth_signature'] - except: - pass - # Escape key values before sorting. - key_values = [(escape(_utf8_str(k)), escape(_utf8_str(v))) \ - for k,v in params.items()] - # Sort lexicographically, first after key, then after value. - key_values.sort() - # Combine key value pairs into a string. - return '&'.join(['%s=%s' % (k, v) for k, v in key_values]) - - def get_normalized_http_method(self): - """Uppercases the http method.""" - return self.http_method.upper() - - def get_normalized_http_url(self): - """Parses the URL and rebuilds it to be scheme://host/path.""" - parts = urlparse.urlparse(self.http_url) - scheme, netloc, path = parts[:3] - # Exclude default port numbers. - if scheme == 'http' and netloc[-3:] == ':80': - netloc = netloc[:-3] - elif scheme == 'https' and netloc[-4:] == ':443': - netloc = netloc[:-4] - return '%s://%s%s' % (scheme, netloc, path) - - def sign_request(self, signature_method, consumer, token): - """Set the signature parameter to the result of build_signature.""" - # Set the signature method. - self.set_parameter('oauth_signature_method', - signature_method.get_name()) - # Set the signature. - self.set_parameter('oauth_signature', - self.build_signature(signature_method, consumer, token)) - - def build_signature(self, signature_method, consumer, token): - """Calls the build signature method within the signature method.""" - return signature_method.build_signature(self, consumer, token) - - def from_request(http_method, http_url, headers=None, parameters=None, - query_string=None): - """Combines multiple parameter sources.""" - if parameters is None: - parameters = {} - - # Headers - if headers and 'Authorization' in headers: - auth_header = headers['Authorization'] - # Check that the authorization header is OAuth. - if auth_header[:6] == 'OAuth ': - auth_header = auth_header[6:] - try: - # Get the parameters from the header. - header_params = OAuthRequest._split_header(auth_header) - parameters.update(header_params) - except: - raise OAuthError('Unable to parse OAuth parameters from ' - 'Authorization header.') - - # GET or POST query string. - if query_string: - query_params = OAuthRequest._split_url_string(query_string) - parameters.update(query_params) - - # URL parameters. - param_str = urlparse.urlparse(http_url)[4] # query - url_params = OAuthRequest._split_url_string(param_str) - parameters.update(url_params) - - if parameters: - return OAuthRequest(http_method, http_url, parameters) - - return None - from_request = staticmethod(from_request) - - def from_consumer_and_token(oauth_consumer, token=None, - callback=None, verifier=None, http_method=HTTP_METHOD, - http_url=None, parameters=None): - if not parameters: - parameters = {} - - defaults = { - 'oauth_consumer_key': oauth_consumer.key, - 'oauth_timestamp': generate_timestamp(), - 'oauth_nonce': generate_nonce(), - 'oauth_version': OAuthRequest.version, - } - - defaults.update(parameters) - parameters = defaults - - if token: - parameters['oauth_token'] = token.key - if token.callback: - parameters['oauth_callback'] = token.callback - # 1.0a support for verifier. - if verifier: - parameters['oauth_verifier'] = verifier - elif callback: - # 1.0a support for callback in the request token request. - parameters['oauth_callback'] = callback - - return OAuthRequest(http_method, http_url, parameters) - from_consumer_and_token = staticmethod(from_consumer_and_token) - - def from_token_and_callback(token, callback=None, http_method=HTTP_METHOD, - http_url=None, parameters=None): - if not parameters: - parameters = {} - - parameters['oauth_token'] = token.key - - if callback: - parameters['oauth_callback'] = callback - - return OAuthRequest(http_method, http_url, parameters) - from_token_and_callback = staticmethod(from_token_and_callback) - - def _split_header(header): - """Turn Authorization: header into parameters.""" - params = {} - parts = header.split(',') - for param in parts: - # Ignore realm parameter. - if param.find('realm') > -1: - continue - # Remove whitespace. - param = param.strip() - # Split key-value. - param_parts = param.split('=', 1) - # Remove quotes and unescape the value. - params[param_parts[0]] = urllib.unquote(param_parts[1].strip('\"')) - return params - _split_header = staticmethod(_split_header) - - def _split_url_string(param_str): - """Turn URL string into parameters.""" - parameters = cgi.parse_qs(param_str, keep_blank_values=False) - for k, v in parameters.iteritems(): - parameters[k] = urllib.unquote(v[0]) - return parameters - _split_url_string = staticmethod(_split_url_string) - -class OAuthServer(object): - """A worker to check the validity of a request against a data store.""" - timestamp_threshold = 300 # In seconds, five minutes. - version = VERSION - signature_methods = None - data_store = None - - def __init__(self, data_store=None, signature_methods=None): - self.data_store = data_store - self.signature_methods = signature_methods or {} - - def set_data_store(self, data_store): - self.data_store = data_store - - def get_data_store(self): - return self.data_store - - def add_signature_method(self, signature_method): - self.signature_methods[signature_method.get_name()] = signature_method - return self.signature_methods - - def fetch_request_token(self, oauth_request): - """Processes a request_token request and returns the - request token on success. - """ - try: - # Get the request token for authorization. - token = self._get_token(oauth_request, 'request') - except OAuthError: - # No token required for the initial token request. - version = self._get_version(oauth_request) - consumer = self._get_consumer(oauth_request) - try: - callback = self.get_callback(oauth_request) - except OAuthError: - callback = None # 1.0, no callback specified. - self._check_signature(oauth_request, consumer, None) - # Fetch a new token. - token = self.data_store.fetch_request_token(consumer, callback) - return token - - def fetch_access_token(self, oauth_request): - """Processes an access_token request and returns the - access token on success. - """ - version = self._get_version(oauth_request) - consumer = self._get_consumer(oauth_request) - try: - verifier = self._get_verifier(oauth_request) - except OAuthError: - verifier = None - # Get the request token. - token = self._get_token(oauth_request, 'request') - self._check_signature(oauth_request, consumer, token) - new_token = self.data_store.fetch_access_token(consumer, token, verifier) - return new_token - - def verify_request(self, oauth_request): - """Verifies an api call and checks all the parameters.""" - # -> consumer and token - version = self._get_version(oauth_request) - consumer = self._get_consumer(oauth_request) - # Get the access token. - token = self._get_token(oauth_request, 'access') - self._check_signature(oauth_request, consumer, token) - parameters = oauth_request.get_nonoauth_parameters() - return consumer, token, parameters - - def authorize_token(self, token, user): - """Authorize a request token.""" - return self.data_store.authorize_request_token(token, user) - - def get_callback(self, oauth_request): - """Get the callback URL.""" - return oauth_request.get_parameter('oauth_callback') - - def build_authenticate_header(self, realm=''): - """Optional support for the authenticate header.""" - return {'WWW-Authenticate': 'OAuth realm="%s"' % realm} - - def _get_version(self, oauth_request): - """Verify the correct version request for this server.""" - try: - version = oauth_request.get_parameter('oauth_version') - except: - version = VERSION - if version and version != self.version: - raise OAuthError('OAuth version %s not supported.' % str(version)) - return version - - def _get_signature_method(self, oauth_request): - """Figure out the signature with some defaults.""" - try: - signature_method = oauth_request.get_parameter( - 'oauth_signature_method') - except: - signature_method = SIGNATURE_METHOD - try: - # Get the signature method object. - signature_method = self.signature_methods[signature_method] - except: - signature_method_names = ', '.join(self.signature_methods.keys()) - raise OAuthError('Signature method %s not supported try one of the ' - 'following: %s' % (signature_method, signature_method_names)) - - return signature_method - - def _get_consumer(self, oauth_request): - consumer_key = oauth_request.get_parameter('oauth_consumer_key') - consumer = self.data_store.lookup_consumer(consumer_key) - if not consumer: - raise OAuthError('Invalid consumer.') - return consumer - - def _get_token(self, oauth_request, token_type='access'): - """Try to find the token for the provided request token key.""" - token_field = oauth_request.get_parameter('oauth_token') - token = self.data_store.lookup_token(token_type, token_field) - if not token: - raise OAuthError('Invalid %s token: %s' % (token_type, token_field)) - return token - - def _get_verifier(self, oauth_request): - return oauth_request.get_parameter('oauth_verifier') - - def _check_signature(self, oauth_request, consumer, token): - timestamp, nonce = oauth_request._get_timestamp_nonce() - self._check_timestamp(timestamp) - self._check_nonce(consumer, token, nonce) - signature_method = self._get_signature_method(oauth_request) - try: - signature = oauth_request.get_parameter('oauth_signature') - except: - raise OAuthError('Missing signature.') - # Validate the signature. - valid_sig = signature_method.check_signature(oauth_request, consumer, - token, signature) - if not valid_sig: - key, base = signature_method.build_signature_base_string( - oauth_request, consumer, token) - raise OAuthError('Invalid signature. Expected signature base ' - 'string: %s' % base) - built = signature_method.build_signature(oauth_request, consumer, token) - - def _check_timestamp(self, timestamp): - """Verify that timestamp is recentish.""" - timestamp = int(timestamp) - now = int(time.time()) - lapsed = abs(now - timestamp) - if lapsed > self.timestamp_threshold: - raise OAuthError('Expired timestamp: given %d and now %s has a ' - 'greater difference than threshold %d' % - (timestamp, now, self.timestamp_threshold)) - - def _check_nonce(self, consumer, token, nonce): - """Verify that the nonce is uniqueish.""" - nonce = self.data_store.lookup_nonce(consumer, token, nonce) - if nonce: - raise OAuthError('Nonce already used: %s' % str(nonce)) - - -class OAuthClient(object): - """OAuthClient is a worker to attempt to execute a request.""" - consumer = None - token = None - - def __init__(self, oauth_consumer, oauth_token): - self.consumer = oauth_consumer - self.token = oauth_token - - def get_consumer(self): - return self.consumer - - def get_token(self): - return self.token - - def fetch_request_token(self, oauth_request): - """-> OAuthToken.""" - raise NotImplementedError - - def fetch_access_token(self, oauth_request): - """-> OAuthToken.""" - raise NotImplementedError - - def access_resource(self, oauth_request): - """-> Some protected resource.""" - raise NotImplementedError - - -class OAuthDataStore(object): - """A database abstraction used to lookup consumers and tokens.""" - - def lookup_consumer(self, key): - """-> OAuthConsumer.""" - raise NotImplementedError - - def lookup_token(self, oauth_consumer, token_type, token_token): - """-> OAuthToken.""" - raise NotImplementedError - - def lookup_nonce(self, oauth_consumer, oauth_token, nonce): - """-> OAuthToken.""" - raise NotImplementedError - - def fetch_request_token(self, oauth_consumer, oauth_callback): - """-> OAuthToken.""" - raise NotImplementedError - - def fetch_access_token(self, oauth_consumer, oauth_token, oauth_verifier): - """-> OAuthToken.""" - raise NotImplementedError - - def authorize_request_token(self, oauth_token, user): - """-> OAuthToken.""" - raise NotImplementedError - - -class OAuthSignatureMethod(object): - """A strategy class that implements a signature method.""" - def get_name(self): - """-> str.""" - raise NotImplementedError - - def build_signature_base_string(self, oauth_request, oauth_consumer, oauth_token): - """-> str key, str raw.""" - raise NotImplementedError - - def build_signature(self, oauth_request, oauth_consumer, oauth_token): - """-> str.""" - raise NotImplementedError - - def check_signature(self, oauth_request, consumer, token, signature): - built = self.build_signature(oauth_request, consumer, token) - return built == signature - - -class OAuthSignatureMethod_HMAC_SHA1(OAuthSignatureMethod): - - def get_name(self): - return 'HMAC-SHA1' - - def build_signature_base_string(self, oauth_request, consumer, token): - sig = ( - escape(oauth_request.get_normalized_http_method()), - escape(oauth_request.get_normalized_http_url()), - escape(oauth_request.get_normalized_parameters()), - ) - - key = '%s&' % escape(consumer.secret) - if token: - key += escape(token.secret) - raw = '&'.join(sig) - return key, raw - - def build_signature(self, oauth_request, consumer, token): - """Builds the base signature string.""" - key, raw = self.build_signature_base_string(oauth_request, consumer, - token) - - # HMAC object. - try: - import hashlib # 2.5 - hashed = hmac.new(key, raw, hashlib.sha1) - except: - import sha # Deprecated - hashed = hmac.new(key, raw, sha) - - # Calculate the digest base 64. - return binascii.b2a_base64(hashed.digest())[:-1] - - -class OAuthSignatureMethod_PLAINTEXT(OAuthSignatureMethod): - - def get_name(self): - return 'PLAINTEXT' - - def build_signature_base_string(self, oauth_request, consumer, token): - """Concatenates the consumer key and secret.""" - sig = '%s&' % escape(consumer.secret) - if token: - sig = sig + escape(token.secret) - return sig, sig - - def build_signature(self, oauth_request, consumer, token): - key, raw = self.build_signature_base_string(oauth_request, consumer, - token) - return key \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/tweetstream/tweetstream/streamclasses.py Fri Jul 01 18:59:13 2011 +0200 @@ -0,0 +1,186 @@ +from . import AuthenticationError, ConnectionError, USER_AGENT +import anyjson +import socket +import time +import urllib +import urllib2 +socket._fileobject.default_bufsize = 0 + + +class BaseStream(object): + """A network connection to Twitters streaming API + + :param username: Twitter username for the account accessing the API. + :param password: Twitter password for the account accessing the API. + :keyword count: Number of tweets from the past to get before switching to + live stream. + :keyword url: Endpoint URL for the object. Note: you should not + need to edit this. It's present to make testing easier. + + .. attribute:: connected + + True if the object is currently connected to the stream. + + .. attribute:: url + + The URL to which the object is connected + + .. attribute:: starttime + + The timestamp, in seconds since the epoch, the object connected to the + streaming api. + + .. attribute:: count + + The number of tweets that have been returned by the object. + + .. attribute:: rate + + The rate at which tweets have been returned from the object as a + float. see also :attr: `rate_period`. + + .. attribute:: rate_period + + The ammount of time to sample tweets to calculate tweet rate. By + default 10 seconds. Changes to this attribute will not be reflected + until the next time the rate is calculated. The rate of tweets vary + with time of day etc. so it's usefull to set this to something + sensible. + + .. attribute:: user_agent + + User agent string that will be included in the request. NOTE: This can + not be changed after the connection has been made. This property must + thus be set before accessing the iterator. The default is set in + :attr: `USER_AGENT`. + """ + + def __init__(self, auth, catchup=None, url=None): + self._conn = None + self._rate_ts = None + self._rate_cnt = 0 + self._auth = auth + self._catchup_count = catchup + + self.rate_period = 10 # in seconds + self.connected = False + self.starttime = None + self.count = 0 + self.rate = 0 + self.user_agent = USER_AGENT + if url: self.url = url + + def __iter__(self): + return self + + def __enter__(self): + return self + + def __exit__(self, *params): + self.close() + return False + + def _init_conn(self): + """Open the connection to the twitter server""" + headers = {'User-Agent': self.user_agent} + + postdata = self._get_post_data() or {} + if self._catchup_count: + postdata["count"] = self._catchup_count + + poststring = urllib.urlencode(postdata) if postdata else None + + if self._auth: + self._auth.apply_auth(self.url, "POST", headers, postdata) + + req = urllib2.Request(self.url, poststring, headers) + + try: + self._conn = urllib2.urlopen(req) + except urllib2.HTTPError, exception: + if exception.code == 401: + raise AuthenticationError("Access denied") + elif exception.code == 404: + raise ConnectionError("URL not found: %s" % self.url) + else: # re raise. No idea what would cause this, so want to know + raise + except urllib2.URLError, exception: + raise ConnectionError(exception.reason) + + self.connected = True + if not self.starttime: + self.starttime = time.time() + if not self._rate_ts: + self._rate_ts = time.time() + + def _get_post_data(self): + """Subclasses that need to add post data to the request can override + this method and return post data. The data should be in the format + returned by urllib.urlencode.""" + return None + + def next(self): + """Return the next available tweet. This call is blocking!""" + while True: + try: + if not self.connected: + self._init_conn() + + rate_time = time.time() - self._rate_ts + if not self._rate_ts or rate_time > self.rate_period: + self.rate = self._rate_cnt / rate_time + self._rate_cnt = 0 + self._rate_ts = time.time() + + data = self._conn.readline() + if data == "": # something is wrong + self.close() + raise ConnectionError("Got entry of length 0. Disconnected") + elif data.isspace(): + continue + + data = anyjson.deserialize(data) + if 'text' in data: + self.count += 1 + self._rate_cnt += 1 + return data + + except ValueError, e: + self.close() + raise ConnectionError("Got invalid data from twitter", + details=data) + + except socket.error, e: + self.close() + raise ConnectionError("Server disconnected") + + def close(self): + """ + Close the connection to the streaming server. + """ + self.connected = False + if self._conn: + self._conn.close() + + +class SampleStream(BaseStream): + url = "http://stream.twitter.com/1/statuses/sample.json" + + +class FilterStream(BaseStream): + url = "http://stream.twitter.com/1/statuses/filter.json" + + def __init__(self, auth, follow=None, locations=None, + track=None, catchup=None, url=None): + self._follow = follow + self._locations = locations + self._track = track + # remove follow, locations, track + BaseStream.__init__(self, auth, url=url) + + def _get_post_data(self): + postdata = {} + if self._follow: postdata["follow"] = ",".join([str(e) for e in self._follow]) + if self._locations: postdata["locations"] = ",".join(self._locations) + if self._track: postdata["track"] = ",".join(self._track) + return postdata
--- a/script/stream/recorder_tweetstream.py Fri Jul 01 10:06:36 2011 +0200 +++ b/script/stream/recorder_tweetstream.py Fri Jul 01 18:59:13 2011 +0200 @@ -10,7 +10,7 @@ import sys import time import tweetstream -import tweetstream.auth +import tweepy.auth socket._fileobject.default_bufsize = 0 @@ -22,7 +22,7 @@ #just put it in a sqlite3 tqble -class ReconnectingTweetStream(tweetstream.TrackStream): +class ReconnectingTweetStream(tweetstream.FilterStream): """TweetStream class that automatically tries to reconnect if the connecting goes down. Reconnecting, and waiting for reconnecting, is blocking. @@ -44,12 +44,12 @@ """ - def __init__(self, auth, keywords, url="track", reconnects=3, error_cb=None, retry_wait=5, **kwargs): + def __init__(self, auth, keywords, reconnects=3, error_cb=None, retry_wait=5, **kwargs): self.max_reconnects = reconnects self.retry_wait = retry_wait self._reconnects = 0 self._error_cb = error_cb - super(ReconnectingTweetStream, self).__init__(auth, keywords, url, **kwargs) + super(ReconnectingTweetStream, self).__init__(auth=auth, track=keywords, **kwargs) def next(self): while True: @@ -90,11 +90,11 @@ track_list = [k for k in track_list.split(',')] if username and password: - auth = tweetstream.auth.BasicAuthHandler(username, password) + auth = tweepy.auth.BasicAuthHandler(username, password) else: consumer_key = models.CONSUMER_KEY consumer_secret = models.CONSUMER_SECRET - auth = tweetstream.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) + auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret, secure=False) auth.set_access_token(*(utils.get_oauth_token(token_filename))) if duration >= 0: @@ -107,7 +107,7 @@ print "Stop recording after %d seconds." % (duration) break process_tweet(tweet, session, debug, token_filename) - logging.info("Tweet count: %d - current rate : %d - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime))) + logging.info("Tweet count: %d - current rate : %.2f - running : %s"%(stream.count, stream.rate, int(time.time()-stream.starttime))) session.commit() finally: stream.close()