| author | Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com> |
| Mon, 20 Feb 2012 01:35:15 +0100 | |
| changeset 527 | 80e5b9543cac |
| parent 526 | 3c14302784f8 (current diff) |
| parent 14 | 10e7a0c7c64f (diff) |
| child 528 | 7fb5a7b0d35c |
--- a/script/lib/tweetstream/CHANGELOG Thu Feb 16 16:40:16 2012 +0100 +++ b/script/lib/tweetstream/CHANGELOG Mon Feb 20 01:35:15 2012 +0100 @@ -52,3 +52,17 @@ - Changed API to match latest twitter endpoints. This adds SampleStream and FilterStream and deprecates TweetStream, FollowStream, LocationStream, TrackStream and ReconnectingTweetStream. + +1.1.0 + + - Fixed issues #2 and #12, related to low volume streams not yielding tweets + until a relatively large buffer was filled. This meant that tweets would + arrive in bunches, not immediatly. + - Switched to HTTPS urls for streams. Twitter will switch off HTTP streams + on 29. sept. 2011. + - Added support for Python 3 + +1.1.1 + + - Fixed issue #16. Odd case where python_version_tuple was returning ints + rather than the strings the docs promis. Make sure we always cast to int.
--- a/script/lib/tweetstream/setup.py Thu Feb 16 16:40:16 2012 +0100 +++ b/script/lib/tweetstream/setup.py Mon Feb 20 01:35:15 2012 +0100 @@ -1,28 +1,82 @@ #@PydevCodeAnalysisIgnore -from setuptools import setup, find_packages -import sys, os +import sys +import os + +extra = {} +if sys.version_info >= (3, 0): + extra.update(use_2to3=True) + + +try: + from setuptools import setup, find_packages +except ImportError: + from distutils.core import setup, find_packages + + +# -*- Distribution Meta -*- +import re +re_meta = re.compile(r'__(\w+?)__\s*=\s*(.*)') +re_vers = re.compile(r'VERSION\s*=\s*\((.*?)\)') +re_doc = re.compile(r'^"""(.+?)"""', re.M|re.S) +rq = lambda s: s.strip("\"'") + + +def add_default(m): + attr_name, attr_value = m.groups() + return ((attr_name, rq(attr_value)), ) + -author = "Rune Halvorsen" -email = "runefh@gmail.com" -version = "1.0.0" -homepage = "http://bitbucket.org/runeh/tweetstream/" +def add_version(m): + v = list(map(rq, m.groups()[0].split(", "))) + return (("VERSION", ".".join(v[0:3]) + "".join(v[3:])), ) + + +def add_doc(m): + return (("doc", m.groups()[0].replace("\n", " ")), ) + +pats = {re_meta: add_default, + re_vers: add_version} +here = os.path.abspath(os.path.dirname(__file__)) +meta_fh = open(os.path.join(here, "tweetstream/__init__.py")) +try: + meta = {} + acc = [] + for line in meta_fh: + if line.strip() == '# -eof meta-': + break + acc.append(line) + for pattern, handler in pats.items(): + m = pattern.match(line.strip()) + if m: + meta.update(handler(m)) + m = re_doc.match("".join(acc).strip()) + if m: + meta.update(add_doc(m)) +finally: + meta_fh.close() + setup(name='tweetstream', - version=version, - description="Simple Twitter streaming API access", + version=meta["VERSION"], + description=meta["doc"], long_description=open("README").read(), classifiers=[ 'License :: OSI Approved :: BSD License', 'Intended Audience :: Developers', + 'Programming Language :: Python :: 2.6', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.1', ], keywords='twitter', - author=author, - author_email=email, - url=homepage, + author=meta["author"], + author_email=meta["contact"], + url=meta["homepage"], license='BSD', packages=find_packages(exclude=['ez_setup', 'examples', 'tests']), include_package_data=True, zip_safe=False, platforms=["any"], - install_requires = ['anyjson'], + install_requires=['anyjson'], + **extra )
--- a/script/lib/tweetstream/tests/test_tweetstream.py Thu Feb 16 16:40:16 2012 +0100 +++ b/script/lib/tweetstream/tests/test_tweetstream.py Mon Feb 20 01:35:15 2012 +0100 @@ -1,10 +1,9 @@ import contextlib import threading import time -from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer from tweetstream import TweetStream, FollowStream, TrackStream, LocationStream -from tweetstream import ConnectionError, AuthenticationError, SampleStream +from tweetstream import ConnectionError, AuthenticationError, SampleStream, FilterStream from tweepy.auth import BasicAuthHandler import pytest @@ -13,7 +12,7 @@ from servercontext import test_server -single_tweet = r"""{"in_reply_to_status_id":null,"in_reply_to_user_id":null,"favorited":false,"created_at":"Tue Jun 16 10:40:14 +0000 2009","in_reply_to_screen_name":null,"text":"record industry just keeps on amazing me: http:\/\/is.gd\/13lFo - $150k per song you've SHARED, not that somebody has actually DOWNLOADED.","user":{"notifications":null,"profile_background_tile":false,"followers_count":206,"time_zone":"Copenhagen","utc_offset":3600,"friends_count":191,"profile_background_color":"ffffff","profile_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_images\/250715794\/profile_normal.png","description":"Digital product developer, currently at Opera Software. My tweets are my opinions, not those of my employer.","verified_profile":false,"protected":false,"favourites_count":0,"profile_text_color":"3C3940","screen_name":"eiriksnilsen","name":"Eirik Stridsklev N.","following":null,"created_at":"Tue May 06 12:24:12 +0000 2008","profile_background_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_background_images\/10531192\/160x600opera15.gif","profile_link_color":"0099B9","profile_sidebar_fill_color":"95E8EC","url":"http:\/\/www.stridsklev-nilsen.no\/eirik","id":14672543,"statuses_count":506,"profile_sidebar_border_color":"5ED4DC","location":"Oslo, Norway"},"id":2190767504,"truncated":false,"source":"<a href=\"http:\/\/widgets.opera.com\/widget\/7206\">Twitter Opera widget<\/a>"}""" +single_tweet = r"""{"in_reply_to_status_id":null,"in_reply_to_user_id":null,"favorited":false,"created_at":"Tue Jun 16 10:40:14 +0000 2009","in_reply_to_screen_name":null,"text":"record industry just keeps on amazing me: http:\/\/is.gd\/13lFo - $150k per song you've SHARED, not that somebody has actually DOWNLOADED.","user":{"notifications":null,"profile_background_tile":false,"followers_count":206,"time_zone":"Copenhagen","utc_offset":3600,"friends_count":191,"profile_background_color":"ffffff","profile_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_images\/250715794\/profile_normal.png","description":"Digital product developer, currently at Opera Software. My tweets are my opinions, not those of my employer.","verified_profile":false,"protected":false,"favourites_count":0,"profile_text_color":"3C3940","screen_name":"eiriksnilsen","name":"Eirik Stridsklev N.","following":null,"created_at":"Tue May 06 12:24:12 +0000 2008","profile_background_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_background_images\/10531192\/160x600opera15.gif","profile_link_color":"0099B9","profile_sidebar_fill_color":"95E8EC","url":"http:\/\/www.stridsklev-nilsen.no\/eirik","id":14672543,"statuses_count":506,"profile_sidebar_border_color":"5ED4DC","location":"Oslo, Norway"},"id":2190767504,"truncated":false,"source":"<a href=\"http:\/\/widgets.opera.com\/widget\/7206\">Twitter Opera widget<\/a>"}""" + "\r" def parameterized(funcarglist): @@ -30,6 +29,7 @@ streamtypes = [ dict(cls=TweetStream, args=[], kwargs=dict()), dict(cls=SampleStream, args=[], kwargs=dict()), + dict(cls=FilterStream, args=[], kwargs=dict(track=("test",))), dict(cls=FollowStream, args=[[1, 2, 3]], kwargs=dict()), dict(cls=TrackStream, args=["opera"], kwargs=dict()), dict(cls=LocationStream, args=["123,4321"], kwargs=dict()) @@ -43,9 +43,11 @@ def auth_denied(request): request.send_error(401) - with test_server(handler=auth_denied, methods=("post", "get"), port="random") as server: - auth = BasicAuthHandler("user", "passwd") - stream = cls(auth, *args, url=server.baseurl) + with raises(AuthenticationError): + with test_server(handler=auth_denied, methods=("post", "get"), port="random") as server: + auth = BasicAuthHandler("user", "passwd") + stream = cls(auth, *args, url=server.baseurl) + for e in stream: pass @parameterized(streamtypes) @@ -55,9 +57,11 @@ def not_found(request): request.send_error(404) - with test_server(handler=not_found, methods=("post", "get"), port="random") as server: - auth = BasicAuthHandler("user", "passwd") - stream = cls(auth, *args, url=server.baseurl) + with raises(ConnectionError): + with test_server(handler=not_found, methods=("post", "get"), port="random") as server: + auth = BasicAuthHandler("user", "passwd") + stream = cls(auth, *args, url=server.baseurl) + for e in stream: pass @parameterized(streamtypes) @@ -67,9 +71,9 @@ for n in xrange(10): # what json we pass doesn't matter. It's not verifying the # strcuture, only checking that it's parsable - yield "[1,2,3]" - yield "[1,2, I need no stinking close brace" - yield "[1,2,3]" + yield "[1,2,3]\r" + yield "[1,2, I need no stinking close brace\r" + yield "[1,2,3]\r" with raises(ConnectionError): @@ -88,7 +92,7 @@ for n in xrange(cnt): # what json we pass doesn't matter. It's not verifying the # strcuture, only checking that it's parsable - yield "[1,2,3]" + yield "[1,2,3]\r" with raises(ConnectionError): with test_server(handler=bad_content, methods=("post", "get"), port="random") as server:
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/script/lib/tweetstream/tox.ini Mon Feb 20 01:35:15 2012 +0100 @@ -0,0 +1,16 @@ +[tox] +envlist = py25,py26,py27,py30,py31,py32 + +[testenv] +deps=pytest +sitepackages=False +commands=py.test --runslow + +[testenv:py30] +changedir = .tox + +[testenv:py31] +changedir = .tox + +[testenv:py32] +changedir = .tox \ No newline at end of file
--- a/script/lib/tweetstream/tweetstream/__init__.py Thu Feb 16 16:40:16 2012 +0100 +++ b/script/lib/tweetstream/tweetstream/__init__.py Mon Feb 20 01:35:15 2012 +0100 @@ -1,11 +1,14 @@ -""" -Simple Twitter streaming API access -""" -__version__ = "1.0.0" -__author__ = "Rune Halvorsen <runefh@gmail.com>" +"""Simple access to Twitter's streaming API""" + +VERSION = (1, 1, 1) +__version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:]) +__author__ = "Rune Halvorsen" +__contact__ = "runefh@gmail.com" __homepage__ = "http://bitbucket.org/runeh/tweetstream/" __docformat__ = "restructuredtext" +# -eof meta- + """ .. data:: USER_AGENT @@ -38,5 +41,5 @@ return '<ConnectionError %s>' % self.reason -from streamclasses import SampleStream, FilterStream -from deprecated import FollowStream, TrackStream, LocationStream, TweetStream, ReconnectingTweetStream +from .streamclasses import SampleStream, FilterStream +from .deprecated import FollowStream, TrackStream, LocationStream, TweetStream, ReconnectingTweetStream
--- a/script/lib/tweetstream/tweetstream/deprecated.py Thu Feb 16 16:40:16 2012 +0100 +++ b/script/lib/tweetstream/tweetstream/deprecated.py Mon Feb 20 01:35:15 2012 +0100 @@ -14,12 +14,12 @@ class TrackStream(DeprecatedStream): - def __init__(self, auth, track, catchup=None, url=None): + def __init__(self, auth, track, catchup=None, url=None, slow=False): super(TrackStream, self).__init__(auth, track=track, catchup=catchup, url=url) class LocationStream(DeprecatedStream): - def __init__(self, auth, locations, catchup=None, url=None): + def __init__(self, auth, locations, catchup=None, url=None, slow=False): super(LocationStream, self).__init__(auth, locations=locations, catchup=catchup, url=url)
--- a/script/lib/tweetstream/tweetstream/streamclasses.py Thu Feb 16 16:40:16 2012 +0100 +++ b/script/lib/tweetstream/tweetstream/streamclasses.py Mon Feb 20 01:35:15 2012 +0100 @@ -1,11 +1,11 @@ -from . import AuthenticationError, ConnectionError, USER_AGENT -import anyjson +import time +import urllib +import urllib2 import socket -import time -import urllib #@UnresolvedImport -import urllib2 #@UnresolvedImport -socket._fileobject.default_bufsize = 0 +from platform import python_version_tuple +import anyjson +from . import AuthenticationError, ConnectionError, USER_AGENT class BaseStream(object): """A network connection to Twitters streaming API @@ -13,6 +13,14 @@ :param auth: tweepy auth object. :keyword catchup: Number of tweets from the past to get before switching to live stream. + :keyword raw: If True, return each tweet's raw data direct from the socket, + without UTF8 decoding or parsing, rather than a parsed object. The + default is False. + :keyword timeout: If non-None, set a timeout in seconds on the receiving + socket. Certain types of network problems (e.g., disconnecting a VPN) + can cause the connection to hang, leading to indefinite blocking that + requires kill -9 to resolve. Setting a timeout leads to an orderly + shutdown in these cases. The default is None (i.e., no timeout). :keyword url: Endpoint URL for the object. Note: you should not need to edit this. It's present to make testing easier. @@ -54,12 +62,16 @@ :attr: `USER_AGENT`. """ - def __init__(self, auth, catchup=None, url=None, as_text=False): + def __init__(self, auth, + catchup=None, raw=False, timeout=None, url=None): self._conn = None self._rate_ts = None self._rate_cnt = 0 self._auth = auth self._catchup_count = catchup + self._raw_mode = raw + self._timeout = timeout + self._iter = self.__iter__() self.rate_period = 10 # in seconds self.connected = False @@ -68,13 +80,9 @@ self.rate = 0 self.user_agent = USER_AGENT if url: self.url = url - self._as_text = as_text self.muststop = False - def __iter__(self): - return self - def __enter__(self): return self @@ -98,7 +106,8 @@ req = urllib2.Request(self.url, poststring, headers) try: - self._conn = urllib2.urlopen(req) + self._conn = urllib2.urlopen(req, timeout=self._timeout) + except urllib2.HTTPError, exception: if exception.code == 401: raise AuthenticationError("Access denied") @@ -109,6 +118,36 @@ except urllib2.URLError, exception: raise ConnectionError(exception.reason) + # This is horrible. This line grabs the raw socket (actually an ssl + # wrapped socket) from the guts of urllib2/httplib. We want the raw + # socket so we can bypass the buffering that those libs provide. + # The buffering is reasonable when dealing with connections that + # try to finish as soon as possible. With twitters' never ending + # connections, it causes a bug where we would not deliver tweets + # until the buffer was full. That's problematic for very low volume + # filterstreams, since you might not see a tweet for minutes or hours + # after they occured while the buffer fills. + # + # Oh, and the inards of the http libs are different things on in + # py2 and 3, so need to deal with that. py3 libs do more of what I + # want by default, but I wont do more special casing for it than + # neccessary. + + major, _, _ = python_version_tuple() + # The cast is needed because apparently some versions return strings + # and some return ints. + # On my ubuntu with stock 2.6 I get strings, which match the docs. + # Someone reported the issue on 2.6.1 on macos, but that was + # manually built, not the bundled one. Anyway, cast for safety. + major = int(major) + if major == 2: + self._socket = self._conn.fp._sock.fp._sock + else: + self._socket = self._conn.fp.raw + # our code that reads from the socket expects a method called recv. + # py3 socket.SocketIO uses the name read, so alias it. + self._socket.recv = self._socket.read + self.connected = True if not self.starttime: self.starttime = time.time() @@ -120,15 +159,22 @@ this method and return post data. The data should be in the format returned by urllib.urlencode.""" return None - + def __muststop(self): if callable(self.muststop): return self.muststop() else: return self.muststop + + def _update_rate(self): + rate_time = time.time() - self._rate_ts + if not self._rate_ts or rate_time > self.rate_period: + self.rate = self._rate_cnt / rate_time + self._rate_cnt = 0 + self._rate_ts = time.time() - def next(self): - """Return the next available tweet. This call is blocking!""" + def __iter__(self): + buf = b"" while True: try: if self.__muststop(): @@ -137,39 +183,46 @@ if not self.connected: self._init_conn() - rate_time = time.time() - self._rate_ts - if not self._rate_ts or rate_time > self.rate_period: - self.rate = self._rate_cnt / rate_time - self._rate_cnt = 0 - self._rate_ts = time.time() - - data = self._conn.readline() - if data == "": # something is wrong + buf += self._socket.recv(8192) + if buf == b"": # something is wrong self.close() raise ConnectionError("Got entry of length 0. Disconnected") - elif data.isspace(): + elif buf.isspace(): + buf = b"" + elif b"\r" not in buf: # not enough data yet. Loop around continue - if not self._as_text: - data = anyjson.deserialize(data) - if 'text' in data: + lines = buf.split(b"\r") + buf = lines[-1] + lines = lines[:-1] + + for line in lines: + if (self._raw_mode): + tweet = line + else: + line = line.decode("utf8") + try: + tweet = anyjson.deserialize(line) + except ValueError, e: + self.close() + raise ConnectionError("Got invalid data from twitter", details=line) + + if 'text' in tweet: self.count += 1 self._rate_cnt += 1 - else: # count and rate may be off, but we count everything - self.count += 1 - self._rate_cnt += 1 - - return data + yield tweet - except ValueError, e: - self.close() - raise ConnectionError("Got invalid data from twitter", - details=data) except socket.error, e: self.close() raise ConnectionError("Server disconnected") + + def next(self): + """Return the next available tweet. This call is blocking!""" + return self._iter.next() + + def close(self): """ Close the connection to the streaming server. @@ -187,12 +240,12 @@ url = "https://stream.twitter.com/1/statuses/filter.json" def __init__(self, auth, follow=None, locations=None, - track=None, catchup=None, url=None, as_text=False): + track=None, catchup=None, url=None, raw=False): self._follow = follow self._locations = locations self._track = track # remove follow, locations, track - BaseStream.__init__(self, auth, url=url, as_text=as_text) + BaseStream.__init__(self, auth, url=url, raw=raw) def _get_post_data(self): postdata = {}