--- a/script/lib/tweetstream/CHANGELOG Fri Jul 01 10:15:32 2011 +0200
+++ b/script/lib/tweetstream/CHANGELOG Mon Feb 20 00:12:16 2012 +0100
@@ -52,3 +52,17 @@
- Changed API to match latest twitter endpoints. This adds SampleStream and
FilterStream and deprecates TweetStream, FollowStream, LocationStream,
TrackStream and ReconnectingTweetStream.
+
+1.1.0
+
+ - Fixed issues #2 and #12, related to low volume streams not yielding tweets
+ until a relatively large buffer was filled. This meant that tweets would
+ arrive in bunches, not immediatly.
+ - Switched to HTTPS urls for streams. Twitter will switch off HTTP streams
+ on 29. sept. 2011.
+ - Added support for Python 3
+
+1.1.1
+
+ - Fixed issue #16. Odd case where python_version_tuple was returning ints
+ rather than the strings the docs promis. Make sure we always cast to int.
--- a/script/lib/tweetstream/setup.py Fri Jul 01 10:15:32 2011 +0200
+++ b/script/lib/tweetstream/setup.py Mon Feb 20 00:12:16 2012 +0100
@@ -1,27 +1,81 @@
-from setuptools import setup, find_packages
-import sys, os
+import sys
+import os
+
+extra = {}
+if sys.version_info >= (3, 0):
+ extra.update(use_2to3=True)
+
+
+try:
+ from setuptools import setup, find_packages
+except ImportError:
+ from distutils.core import setup, find_packages
+
+
+# -*- Distribution Meta -*-
+import re
+re_meta = re.compile(r'__(\w+?)__\s*=\s*(.*)')
+re_vers = re.compile(r'VERSION\s*=\s*\((.*?)\)')
+re_doc = re.compile(r'^"""(.+?)"""', re.M|re.S)
+rq = lambda s: s.strip("\"'")
+
+
+def add_default(m):
+ attr_name, attr_value = m.groups()
+ return ((attr_name, rq(attr_value)), )
+
-author = "Rune Halvorsen"
-email = "runefh@gmail.com"
-version = "1.0.0"
-homepage = "http://bitbucket.org/runeh/tweetstream/"
+def add_version(m):
+ v = list(map(rq, m.groups()[0].split(", ")))
+ return (("VERSION", ".".join(v[0:3]) + "".join(v[3:])), )
+
+
+def add_doc(m):
+ return (("doc", m.groups()[0].replace("\n", " ")), )
+
+pats = {re_meta: add_default,
+ re_vers: add_version}
+here = os.path.abspath(os.path.dirname(__file__))
+meta_fh = open(os.path.join(here, "tweetstream/__init__.py"))
+try:
+ meta = {}
+ acc = []
+ for line in meta_fh:
+ if line.strip() == '# -eof meta-':
+ break
+ acc.append(line)
+ for pattern, handler in pats.items():
+ m = pattern.match(line.strip())
+ if m:
+ meta.update(handler(m))
+ m = re_doc.match("".join(acc).strip())
+ if m:
+ meta.update(add_doc(m))
+finally:
+ meta_fh.close()
+
setup(name='tweetstream',
- version=version,
- description="Simple Twitter streaming API access",
+ version=meta["VERSION"],
+ description=meta["doc"],
long_description=open("README").read(),
classifiers=[
'License :: OSI Approved :: BSD License',
'Intended Audience :: Developers',
+ 'Programming Language :: Python :: 2.6',
+ 'Programming Language :: Python :: 2.7',
+ 'Programming Language :: Python :: 3',
+ 'Programming Language :: Python :: 3.1',
],
keywords='twitter',
- author=author,
- author_email=email,
- url=homepage,
+ author=meta["author"],
+ author_email=meta["contact"],
+ url=meta["homepage"],
license='BSD',
packages=find_packages(exclude=['ez_setup', 'examples', 'tests']),
include_package_data=True,
zip_safe=False,
platforms=["any"],
- install_requires = ['anyjson'],
+ install_requires=['anyjson'],
+ **extra
)
--- a/script/lib/tweetstream/tests/test_tweetstream.py Fri Jul 01 10:15:32 2011 +0200
+++ b/script/lib/tweetstream/tests/test_tweetstream.py Mon Feb 20 00:12:16 2012 +0100
@@ -1,10 +1,9 @@
import contextlib
import threading
import time
-from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from tweetstream import TweetStream, FollowStream, TrackStream, LocationStream
-from tweetstream import ConnectionError, AuthenticationError, SampleStream
+from tweetstream import ConnectionError, AuthenticationError, SampleStream, FilterStream
import pytest
from pytest import raises
@@ -12,7 +11,7 @@
from servercontext import test_server
-single_tweet = r"""{"in_reply_to_status_id":null,"in_reply_to_user_id":null,"favorited":false,"created_at":"Tue Jun 16 10:40:14 +0000 2009","in_reply_to_screen_name":null,"text":"record industry just keeps on amazing me: http:\/\/is.gd\/13lFo - $150k per song you've SHARED, not that somebody has actually DOWNLOADED.","user":{"notifications":null,"profile_background_tile":false,"followers_count":206,"time_zone":"Copenhagen","utc_offset":3600,"friends_count":191,"profile_background_color":"ffffff","profile_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_images\/250715794\/profile_normal.png","description":"Digital product developer, currently at Opera Software. My tweets are my opinions, not those of my employer.","verified_profile":false,"protected":false,"favourites_count":0,"profile_text_color":"3C3940","screen_name":"eiriksnilsen","name":"Eirik Stridsklev N.","following":null,"created_at":"Tue May 06 12:24:12 +0000 2008","profile_background_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_background_images\/10531192\/160x600opera15.gif","profile_link_color":"0099B9","profile_sidebar_fill_color":"95E8EC","url":"http:\/\/www.stridsklev-nilsen.no\/eirik","id":14672543,"statuses_count":506,"profile_sidebar_border_color":"5ED4DC","location":"Oslo, Norway"},"id":2190767504,"truncated":false,"source":"<a href=\"http:\/\/widgets.opera.com\/widget\/7206\">Twitter Opera widget<\/a>"}"""
+single_tweet = r"""{"in_reply_to_status_id":null,"in_reply_to_user_id":null,"favorited":false,"created_at":"Tue Jun 16 10:40:14 +0000 2009","in_reply_to_screen_name":null,"text":"record industry just keeps on amazing me: http:\/\/is.gd\/13lFo - $150k per song you've SHARED, not that somebody has actually DOWNLOADED.","user":{"notifications":null,"profile_background_tile":false,"followers_count":206,"time_zone":"Copenhagen","utc_offset":3600,"friends_count":191,"profile_background_color":"ffffff","profile_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_images\/250715794\/profile_normal.png","description":"Digital product developer, currently at Opera Software. My tweets are my opinions, not those of my employer.","verified_profile":false,"protected":false,"favourites_count":0,"profile_text_color":"3C3940","screen_name":"eiriksnilsen","name":"Eirik Stridsklev N.","following":null,"created_at":"Tue May 06 12:24:12 +0000 2008","profile_background_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_background_images\/10531192\/160x600opera15.gif","profile_link_color":"0099B9","profile_sidebar_fill_color":"95E8EC","url":"http:\/\/www.stridsklev-nilsen.no\/eirik","id":14672543,"statuses_count":506,"profile_sidebar_border_color":"5ED4DC","location":"Oslo, Norway"},"id":2190767504,"truncated":false,"source":"<a href=\"http:\/\/widgets.opera.com\/widget\/7206\">Twitter Opera widget<\/a>"}""" + "\r"
def parameterized(funcarglist):
@@ -29,6 +28,7 @@
streamtypes = [
dict(cls=TweetStream, args=[], kwargs=dict()),
dict(cls=SampleStream, args=[], kwargs=dict()),
+ dict(cls=FilterStream, args=[], kwargs=dict(track=("test",))),
dict(cls=FollowStream, args=[[1, 2, 3]], kwargs=dict()),
dict(cls=TrackStream, args=["opera"], kwargs=dict()),
dict(cls=LocationStream, args=["123,4321"], kwargs=dict())
@@ -42,8 +42,10 @@
def auth_denied(request):
request.send_error(401)
- with test_server(handler=auth_denied, methods=("post", "get"), port="random") as server:
- stream = cls("user", "passwd", *args, url=server.baseurl)
+ with raises(AuthenticationError):
+ with test_server(handler=auth_denied, methods=("post", "get"), port="random") as server:
+ stream = cls("user", "passwd", *args, url=server.baseurl)
+ for e in stream: pass
@parameterized(streamtypes)
@@ -53,8 +55,10 @@
def not_found(request):
request.send_error(404)
- with test_server(handler=not_found, methods=("post", "get"), port="random") as server:
- stream = cls("user", "passwd", *args, url=server.baseurl)
+ with raises(ConnectionError):
+ with test_server(handler=not_found, methods=("post", "get"), port="random") as server:
+ stream = cls("user", "passwd", *args, url=server.baseurl)
+ for e in stream: pass
@parameterized(streamtypes)
@@ -64,9 +68,9 @@
for n in xrange(10):
# what json we pass doesn't matter. It's not verifying the
# strcuture, only checking that it's parsable
- yield "[1,2,3]"
- yield "[1,2, I need no stinking close brace"
- yield "[1,2,3]"
+ yield "[1,2,3]\r"
+ yield "[1,2, I need no stinking close brace\r"
+ yield "[1,2,3]\r"
with raises(ConnectionError):
@@ -84,7 +88,7 @@
for n in xrange(cnt):
# what json we pass doesn't matter. It's not verifying the
# strcuture, only checking that it's parsable
- yield "[1,2,3]"
+ yield "[1,2,3]\r"
with raises(ConnectionError):
with test_server(handler=bad_content, methods=("post", "get"), port="random") as server:
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/tweetstream/tox.ini Mon Feb 20 00:12:16 2012 +0100
@@ -0,0 +1,16 @@
+[tox]
+envlist = py25,py26,py27,py30,py31,py32
+
+[testenv]
+deps=pytest
+sitepackages=False
+commands=py.test --runslow
+
+[testenv:py30]
+changedir = .tox
+
+[testenv:py31]
+changedir = .tox
+
+[testenv:py32]
+changedir = .tox
\ No newline at end of file
--- a/script/lib/tweetstream/tweetstream/__init__.py Fri Jul 01 10:15:32 2011 +0200
+++ b/script/lib/tweetstream/tweetstream/__init__.py Mon Feb 20 00:12:16 2012 +0100
@@ -1,11 +1,14 @@
-"""
-Simple Twitter streaming API access
-"""
-__version__ = "1.0.0"
-__author__ = "Rune Halvorsen <runefh@gmail.com>"
+"""Simple access to Twitter's streaming API"""
+
+VERSION = (1, 1, 1)
+__version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:])
+__author__ = "Rune Halvorsen"
+__contact__ = "runefh@gmail.com"
__homepage__ = "http://bitbucket.org/runeh/tweetstream/"
__docformat__ = "restructuredtext"
+# -eof meta-
+
"""
.. data:: USER_AGENT
@@ -38,5 +41,5 @@
return '<ConnectionError %s>' % self.reason
-from streamclasses import SampleStream, FilterStream
-from deprecated import FollowStream, TrackStream, LocationStream, TweetStream, ReconnectingTweetStream
+from .streamclasses import SampleStream, FilterStream
+from .deprecated import FollowStream, TrackStream, LocationStream, TweetStream, ReconnectingTweetStream
--- a/script/lib/tweetstream/tweetstream/deprecated.py Fri Jul 01 10:15:32 2011 +0200
+++ b/script/lib/tweetstream/tweetstream/deprecated.py Mon Feb 20 00:12:16 2012 +0100
@@ -13,12 +13,12 @@
class TrackStream(DeprecatedStream):
- def __init__(self, username, password, track, catchup=None, url=None):
+ def __init__(self, username, password, track, catchup=None, url=None, slow=False):
super(TrackStream, self).__init__(username, password, track=track, catchup=catchup, url=url)
class LocationStream(DeprecatedStream):
- def __init__(self, username, password, locations, catchup=None, url=None):
+ def __init__(self, username, password, locations, catchup=None, url=None, slow=False):
super(LocationStream, self).__init__(username, password, locations=locations, catchup=catchup, url=url)
--- a/script/lib/tweetstream/tweetstream/streamclasses.py Fri Jul 01 10:15:32 2011 +0200
+++ b/script/lib/tweetstream/tweetstream/streamclasses.py Mon Feb 20 00:12:16 2012 +0100
@@ -2,6 +2,7 @@
import urllib
import urllib2
import socket
+from platform import python_version_tuple
import anyjson
from . import AuthenticationError, ConnectionError, USER_AGENT
@@ -13,6 +14,14 @@
:param password: Twitter password for the account accessing the API.
:keyword count: Number of tweets from the past to get before switching to
live stream.
+ :keyword raw: If True, return each tweet's raw data direct from the socket,
+ without UTF8 decoding or parsing, rather than a parsed object. The
+ default is False.
+ :keyword timeout: If non-None, set a timeout in seconds on the receiving
+ socket. Certain types of network problems (e.g., disconnecting a VPN)
+ can cause the connection to hang, leading to indefinite blocking that
+ requires kill -9 to resolve. Setting a timeout leads to an orderly
+ shutdown in these cases. The default is None (i.e., no timeout).
:keyword url: Endpoint URL for the object. Note: you should not
need to edit this. It's present to make testing easier.
@@ -54,13 +63,17 @@
:attr: `USER_AGENT`.
"""
- def __init__(self, username, password, catchup=None, url=None):
+ def __init__(self, username, password,
+ catchup=None, raw=False, timeout=None, url=None):
self._conn = None
self._rate_ts = None
self._rate_cnt = 0
self._username = username
self._password = password
self._catchup_count = catchup
+ self._raw_mode = raw
+ self._timeout = timeout
+ self._iter = self.__iter__()
self.rate_period = 10 # in seconds
self.connected = False
@@ -70,9 +83,6 @@
self.user_agent = USER_AGENT
if url: self.url = url
- def __iter__(self):
- return self
-
def __enter__(self):
return self
@@ -97,7 +107,8 @@
opener = urllib2.build_opener(handler)
try:
- self._conn = opener.open(req)
+ self._conn = opener.open(req, timeout=self._timeout)
+
except urllib2.HTTPError, exception:
if exception.code == 401:
raise AuthenticationError("Access denied")
@@ -108,6 +119,36 @@
except urllib2.URLError, exception:
raise ConnectionError(exception.reason)
+ # This is horrible. This line grabs the raw socket (actually an ssl
+ # wrapped socket) from the guts of urllib2/httplib. We want the raw
+ # socket so we can bypass the buffering that those libs provide.
+ # The buffering is reasonable when dealing with connections that
+ # try to finish as soon as possible. With twitters' never ending
+ # connections, it causes a bug where we would not deliver tweets
+ # until the buffer was full. That's problematic for very low volume
+ # filterstreams, since you might not see a tweet for minutes or hours
+ # after they occured while the buffer fills.
+ #
+ # Oh, and the inards of the http libs are different things on in
+ # py2 and 3, so need to deal with that. py3 libs do more of what I
+ # want by default, but I wont do more special casing for it than
+ # neccessary.
+
+ major, _, _ = python_version_tuple()
+ # The cast is needed because apparently some versions return strings
+ # and some return ints.
+ # On my ubuntu with stock 2.6 I get strings, which match the docs.
+ # Someone reported the issue on 2.6.1 on macos, but that was
+ # manually built, not the bundled one. Anyway, cast for safety.
+ major = int(major)
+ if major == 2:
+ self._socket = self._conn.fp._sock.fp._sock
+ else:
+ self._socket = self._conn.fp.raw
+ # our code that reads from the socket expects a method called recv.
+ # py3 socket.SocketIO uses the name read, so alias it.
+ self._socket.recv = self._socket.read
+
self.connected = True
if not self.starttime:
self.starttime = time.time()
@@ -120,41 +161,60 @@
returned by urllib.urlencode."""
return None
- def next(self):
- """Return the next available tweet. This call is blocking!"""
+ def _update_rate(self):
+ rate_time = time.time() - self._rate_ts
+ if not self._rate_ts or rate_time > self.rate_period:
+ self.rate = self._rate_cnt / rate_time
+ self._rate_cnt = 0
+ self._rate_ts = time.time()
+
+ def __iter__(self):
+ buf = b""
while True:
try:
if not self.connected:
self._init_conn()
- rate_time = time.time() - self._rate_ts
- if not self._rate_ts or rate_time > self.rate_period:
- self.rate = self._rate_cnt / rate_time
- self._rate_cnt = 0
- self._rate_ts = time.time()
-
- data = self._conn.readline()
- if data == "": # something is wrong
+ buf += self._socket.recv(8192)
+ if buf == b"": # something is wrong
self.close()
raise ConnectionError("Got entry of length 0. Disconnected")
- elif data.isspace():
+ elif buf.isspace():
+ buf = b""
+ elif b"\r" not in buf: # not enough data yet. Loop around
continue
- data = anyjson.deserialize(data)
- if 'text' in data:
- self.count += 1
- self._rate_cnt += 1
- return data
+ lines = buf.split(b"\r")
+ buf = lines[-1]
+ lines = lines[:-1]
- except ValueError, e:
- self.close()
- raise ConnectionError("Got invalid data from twitter",
- details=data)
+ for line in lines:
+ if (self._raw_mode):
+ tweet = line
+ else:
+ line = line.decode("utf8")
+ try:
+ tweet = anyjson.deserialize(line)
+ except ValueError, e:
+ self.close()
+ raise ConnectionError("Got invalid data from twitter", details=line)
+
+ if 'text' in tweet:
+ self.count += 1
+ self._rate_cnt += 1
+ yield tweet
+
except socket.error, e:
self.close()
raise ConnectionError("Server disconnected")
+
+ def next(self):
+ """Return the next available tweet. This call is blocking!"""
+ return self._iter.next()
+
+
def close(self):
"""
Close the connection to the streaming server.
@@ -165,11 +225,11 @@
class SampleStream(BaseStream):
- url = "http://stream.twitter.com/1/statuses/sample.json"
+ url = "https://stream.twitter.com/1/statuses/sample.json"
class FilterStream(BaseStream):
- url = "http://stream.twitter.com/1/statuses/filter.json"
+ url = "https://stream.twitter.com/1/statuses/filter.json"
def __init__(self, username, password, follow=None, locations=None,
track=None, catchup=None, url=None):