upgrade tweet stream to 1.1.1
authorYves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Mon, 20 Feb 2012 00:12:16 +0100
changeset 14 10e7a0c7c64f
parent 13 79b6e132e3d7
child 527 80e5b9543cac
upgrade tweet stream to 1.1.1
script/lib/tweetstream/CHANGELOG
script/lib/tweetstream/setup.py
script/lib/tweetstream/tests/test_tweetstream.py
script/lib/tweetstream/tox.ini
script/lib/tweetstream/tweetstream/__init__.py
script/lib/tweetstream/tweetstream/deprecated.py
script/lib/tweetstream/tweetstream/streamclasses.py
--- a/script/lib/tweetstream/CHANGELOG	Fri Jul 01 10:15:32 2011 +0200
+++ b/script/lib/tweetstream/CHANGELOG	Mon Feb 20 00:12:16 2012 +0100
@@ -52,3 +52,17 @@
  - Changed API to match latest twitter endpoints. This adds SampleStream and
    FilterStream and deprecates TweetStream, FollowStream, LocationStream,
    TrackStream and ReconnectingTweetStream.
+
+1.1.0
+
+ - Fixed issues #2 and #12, related to low volume streams not yielding tweets
+   until a relatively large buffer was filled. This meant that tweets would
+   arrive in bunches, not immediatly.
+ - Switched to HTTPS urls for streams. Twitter will switch off HTTP streams
+   on 29. sept. 2011.
+ - Added support for Python 3
+
+1.1.1
+
+ - Fixed issue #16. Odd case where python_version_tuple was returning ints
+   rather than the strings the docs promis. Make sure we always cast to int.
--- a/script/lib/tweetstream/setup.py	Fri Jul 01 10:15:32 2011 +0200
+++ b/script/lib/tweetstream/setup.py	Mon Feb 20 00:12:16 2012 +0100
@@ -1,27 +1,81 @@
-from setuptools import setup, find_packages
-import sys, os
+import sys
+import os
+
+extra = {}
+if sys.version_info >= (3, 0):
+    extra.update(use_2to3=True)
+
+
+try:
+    from setuptools import setup, find_packages
+except ImportError:
+    from distutils.core import setup, find_packages
+
+
+# -*- Distribution Meta -*-
+import re
+re_meta = re.compile(r'__(\w+?)__\s*=\s*(.*)')
+re_vers = re.compile(r'VERSION\s*=\s*\((.*?)\)')
+re_doc = re.compile(r'^"""(.+?)"""', re.M|re.S)
+rq = lambda s: s.strip("\"'")
+
+
+def add_default(m):
+    attr_name, attr_value = m.groups()
+    return ((attr_name, rq(attr_value)), )
+
 
-author = "Rune Halvorsen" 
-email = "runefh@gmail.com"
-version = "1.0.0"
-homepage = "http://bitbucket.org/runeh/tweetstream/"
+def add_version(m):
+    v = list(map(rq, m.groups()[0].split(", ")))
+    return (("VERSION", ".".join(v[0:3]) + "".join(v[3:])), )
+
+
+def add_doc(m):
+    return (("doc", m.groups()[0].replace("\n", " ")), )
+
+pats = {re_meta: add_default,
+        re_vers: add_version}
+here = os.path.abspath(os.path.dirname(__file__))
+meta_fh = open(os.path.join(here, "tweetstream/__init__.py"))
+try:
+    meta = {}
+    acc = []
+    for line in meta_fh:
+        if line.strip() == '# -eof meta-':
+            break
+        acc.append(line)
+        for pattern, handler in pats.items():
+            m = pattern.match(line.strip())
+            if m:
+                meta.update(handler(m))
+    m = re_doc.match("".join(acc).strip())
+    if m:
+        meta.update(add_doc(m))
+finally:
+    meta_fh.close()
+
 
 setup(name='tweetstream',
-    version=version,
-    description="Simple Twitter streaming API access",
+    version=meta["VERSION"],
+    description=meta["doc"],
     long_description=open("README").read(),
     classifiers=[
         'License :: OSI Approved :: BSD License',
         'Intended Audience :: Developers',
+        'Programming Language :: Python :: 2.6',
+        'Programming Language :: Python :: 2.7',
+        'Programming Language :: Python :: 3',
+        'Programming Language :: Python :: 3.1',
     ],
     keywords='twitter',
-    author=author,
-    author_email=email,
-    url=homepage,
+    author=meta["author"],
+    author_email=meta["contact"],
+    url=meta["homepage"],
     license='BSD',
     packages=find_packages(exclude=['ez_setup', 'examples', 'tests']),
     include_package_data=True,
     zip_safe=False,
     platforms=["any"],
-    install_requires = ['anyjson'],
+    install_requires=['anyjson'],
+    **extra
 )
--- a/script/lib/tweetstream/tests/test_tweetstream.py	Fri Jul 01 10:15:32 2011 +0200
+++ b/script/lib/tweetstream/tests/test_tweetstream.py	Mon Feb 20 00:12:16 2012 +0100
@@ -1,10 +1,9 @@
 import contextlib
 import threading
 import time
-from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
 
 from tweetstream import TweetStream, FollowStream, TrackStream, LocationStream
-from tweetstream import ConnectionError, AuthenticationError, SampleStream
+from tweetstream import ConnectionError, AuthenticationError, SampleStream, FilterStream
 
 import pytest
 from pytest import raises
@@ -12,7 +11,7 @@
 
 from servercontext import test_server
 
-single_tweet = r"""{"in_reply_to_status_id":null,"in_reply_to_user_id":null,"favorited":false,"created_at":"Tue Jun 16 10:40:14 +0000 2009","in_reply_to_screen_name":null,"text":"record industry just keeps on amazing me: http:\/\/is.gd\/13lFo - $150k per song you've SHARED, not that somebody has actually DOWNLOADED.","user":{"notifications":null,"profile_background_tile":false,"followers_count":206,"time_zone":"Copenhagen","utc_offset":3600,"friends_count":191,"profile_background_color":"ffffff","profile_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_images\/250715794\/profile_normal.png","description":"Digital product developer, currently at Opera Software. My tweets are my opinions, not those of my employer.","verified_profile":false,"protected":false,"favourites_count":0,"profile_text_color":"3C3940","screen_name":"eiriksnilsen","name":"Eirik Stridsklev N.","following":null,"created_at":"Tue May 06 12:24:12 +0000 2008","profile_background_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_background_images\/10531192\/160x600opera15.gif","profile_link_color":"0099B9","profile_sidebar_fill_color":"95E8EC","url":"http:\/\/www.stridsklev-nilsen.no\/eirik","id":14672543,"statuses_count":506,"profile_sidebar_border_color":"5ED4DC","location":"Oslo, Norway"},"id":2190767504,"truncated":false,"source":"<a href=\"http:\/\/widgets.opera.com\/widget\/7206\">Twitter Opera widget<\/a>"}"""
+single_tweet = r"""{"in_reply_to_status_id":null,"in_reply_to_user_id":null,"favorited":false,"created_at":"Tue Jun 16 10:40:14 +0000 2009","in_reply_to_screen_name":null,"text":"record industry just keeps on amazing me: http:\/\/is.gd\/13lFo - $150k per song you've SHARED, not that somebody has actually DOWNLOADED.","user":{"notifications":null,"profile_background_tile":false,"followers_count":206,"time_zone":"Copenhagen","utc_offset":3600,"friends_count":191,"profile_background_color":"ffffff","profile_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_images\/250715794\/profile_normal.png","description":"Digital product developer, currently at Opera Software. My tweets are my opinions, not those of my employer.","verified_profile":false,"protected":false,"favourites_count":0,"profile_text_color":"3C3940","screen_name":"eiriksnilsen","name":"Eirik Stridsklev N.","following":null,"created_at":"Tue May 06 12:24:12 +0000 2008","profile_background_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_background_images\/10531192\/160x600opera15.gif","profile_link_color":"0099B9","profile_sidebar_fill_color":"95E8EC","url":"http:\/\/www.stridsklev-nilsen.no\/eirik","id":14672543,"statuses_count":506,"profile_sidebar_border_color":"5ED4DC","location":"Oslo, Norway"},"id":2190767504,"truncated":false,"source":"<a href=\"http:\/\/widgets.opera.com\/widget\/7206\">Twitter Opera widget<\/a>"}""" + "\r"
 
 
 def parameterized(funcarglist):
@@ -29,6 +28,7 @@
 streamtypes = [
     dict(cls=TweetStream, args=[], kwargs=dict()),
     dict(cls=SampleStream, args=[], kwargs=dict()),
+    dict(cls=FilterStream, args=[], kwargs=dict(track=("test",))),
     dict(cls=FollowStream, args=[[1, 2, 3]], kwargs=dict()),
     dict(cls=TrackStream, args=["opera"], kwargs=dict()),
     dict(cls=LocationStream, args=["123,4321"], kwargs=dict())
@@ -42,8 +42,10 @@
     def auth_denied(request):
         request.send_error(401)
 
-    with test_server(handler=auth_denied, methods=("post", "get"), port="random") as server:
-        stream = cls("user", "passwd", *args, url=server.baseurl)
+    with raises(AuthenticationError):
+        with test_server(handler=auth_denied, methods=("post", "get"), port="random") as server:
+            stream = cls("user", "passwd", *args, url=server.baseurl)
+            for e in stream: pass
 
 
 @parameterized(streamtypes)
@@ -53,8 +55,10 @@
     def not_found(request):
         request.send_error(404)
 
-    with test_server(handler=not_found, methods=("post", "get"), port="random") as server:
-        stream = cls("user", "passwd", *args, url=server.baseurl)
+    with raises(ConnectionError):
+        with test_server(handler=not_found, methods=("post", "get"), port="random") as server:
+            stream = cls("user", "passwd", *args, url=server.baseurl)
+            for e in stream: pass
 
 
 @parameterized(streamtypes)
@@ -64,9 +68,9 @@
         for n in xrange(10):
             # what json we pass doesn't matter. It's not verifying the
             # strcuture, only checking that it's parsable
-            yield "[1,2,3]"
-        yield "[1,2, I need no stinking close brace"
-        yield "[1,2,3]"
+            yield "[1,2,3]\r"
+        yield "[1,2, I need no stinking close brace\r"
+        yield "[1,2,3]\r"
 
 
     with raises(ConnectionError):
@@ -84,7 +88,7 @@
         for n in xrange(cnt):
             # what json we pass doesn't matter. It's not verifying the
             # strcuture, only checking that it's parsable
-            yield "[1,2,3]"
+            yield "[1,2,3]\r"
 
     with raises(ConnectionError):
         with test_server(handler=bad_content, methods=("post", "get"), port="random") as server:
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/tweetstream/tox.ini	Mon Feb 20 00:12:16 2012 +0100
@@ -0,0 +1,16 @@
+[tox]
+envlist = py25,py26,py27,py30,py31,py32
+
+[testenv]
+deps=pytest
+sitepackages=False
+commands=py.test --runslow
+
+[testenv:py30]
+changedir = .tox
+
+[testenv:py31]
+changedir = .tox
+
+[testenv:py32]
+changedir = .tox
\ No newline at end of file
--- a/script/lib/tweetstream/tweetstream/__init__.py	Fri Jul 01 10:15:32 2011 +0200
+++ b/script/lib/tweetstream/tweetstream/__init__.py	Mon Feb 20 00:12:16 2012 +0100
@@ -1,11 +1,14 @@
-"""
-Simple Twitter streaming API access
-"""
-__version__ = "1.0.0"
-__author__ = "Rune Halvorsen <runefh@gmail.com>"
+"""Simple access to Twitter's streaming API"""
+
+VERSION = (1, 1, 1)
+__version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:])
+__author__ = "Rune Halvorsen"
+__contact__ = "runefh@gmail.com"
 __homepage__ = "http://bitbucket.org/runeh/tweetstream/"
 __docformat__ = "restructuredtext"
 
+# -eof meta-
+
 
 """
  .. data:: USER_AGENT
@@ -38,5 +41,5 @@
         return '<ConnectionError %s>' % self.reason
 
 
-from streamclasses import SampleStream, FilterStream
-from deprecated import FollowStream, TrackStream, LocationStream, TweetStream, ReconnectingTweetStream
+from .streamclasses import SampleStream, FilterStream
+from .deprecated import FollowStream, TrackStream, LocationStream, TweetStream, ReconnectingTweetStream
--- a/script/lib/tweetstream/tweetstream/deprecated.py	Fri Jul 01 10:15:32 2011 +0200
+++ b/script/lib/tweetstream/tweetstream/deprecated.py	Mon Feb 20 00:12:16 2012 +0100
@@ -13,12 +13,12 @@
 
 
 class TrackStream(DeprecatedStream):
-    def __init__(self, username, password, track, catchup=None, url=None):
+    def __init__(self, username, password, track, catchup=None, url=None, slow=False):
         super(TrackStream, self).__init__(username, password, track=track, catchup=catchup, url=url)
 
 
 class LocationStream(DeprecatedStream):
-    def __init__(self, username, password, locations, catchup=None, url=None):
+    def __init__(self, username, password, locations, catchup=None, url=None, slow=False):
         super(LocationStream, self).__init__(username, password, locations=locations, catchup=catchup, url=url)
 
 
--- a/script/lib/tweetstream/tweetstream/streamclasses.py	Fri Jul 01 10:15:32 2011 +0200
+++ b/script/lib/tweetstream/tweetstream/streamclasses.py	Mon Feb 20 00:12:16 2012 +0100
@@ -2,6 +2,7 @@
 import urllib
 import urllib2
 import socket
+from platform import python_version_tuple
 import anyjson
 
 from . import AuthenticationError, ConnectionError, USER_AGENT
@@ -13,6 +14,14 @@
     :param password: Twitter password for the account accessing the API.
     :keyword count: Number of tweets from the past to get before switching to
       live stream.
+    :keyword raw: If True, return each tweet's raw data direct from the socket,
+      without UTF8 decoding or parsing, rather than a parsed object. The
+      default is False.
+    :keyword timeout: If non-None, set a timeout in seconds on the receiving
+      socket. Certain types of network problems (e.g., disconnecting a VPN)
+      can cause the connection to hang, leading to indefinite blocking that
+      requires kill -9 to resolve. Setting a timeout leads to an orderly
+      shutdown in these cases. The default is None (i.e., no timeout).
     :keyword url: Endpoint URL for the object. Note: you should not
       need to edit this. It's present to make testing easier.
 
@@ -54,13 +63,17 @@
         :attr: `USER_AGENT`.
     """
 
-    def __init__(self, username, password, catchup=None, url=None):
+    def __init__(self, username, password,
+                 catchup=None, raw=False, timeout=None, url=None):
         self._conn = None
         self._rate_ts = None
         self._rate_cnt = 0
         self._username = username
         self._password = password
         self._catchup_count = catchup
+        self._raw_mode = raw
+        self._timeout = timeout
+        self._iter = self.__iter__()
 
         self.rate_period = 10  # in seconds
         self.connected = False
@@ -70,9 +83,6 @@
         self.user_agent = USER_AGENT
         if url: self.url = url
 
-    def __iter__(self):
-        return self
-
     def __enter__(self):
         return self
 
@@ -97,7 +107,8 @@
         opener = urllib2.build_opener(handler)
 
         try:
-            self._conn = opener.open(req)
+            self._conn = opener.open(req, timeout=self._timeout)
+
         except urllib2.HTTPError, exception:
             if exception.code == 401:
                 raise AuthenticationError("Access denied")
@@ -108,6 +119,36 @@
         except urllib2.URLError, exception:
             raise ConnectionError(exception.reason)
 
+        # This is horrible. This line grabs the raw socket (actually an ssl
+        # wrapped socket) from the guts of urllib2/httplib. We want the raw
+        # socket so we can bypass the buffering that those libs provide.
+        # The buffering is reasonable when dealing with connections that
+        # try to finish as soon as possible. With twitters' never ending
+        # connections, it causes a bug where we would not deliver tweets
+        # until the buffer was full. That's problematic for very low volume
+        # filterstreams, since you might not see a tweet for minutes or hours
+        # after they occured while the buffer fills.
+        #
+        # Oh, and the inards of the http libs are different things on in
+        # py2 and 3, so need to deal with that. py3 libs do more of what I
+        # want by default, but I wont do more special casing for it than
+        # neccessary.
+
+        major, _, _ = python_version_tuple()
+        # The cast is needed because apparently some versions return strings
+        # and some return ints.
+        # On my ubuntu with stock 2.6 I get strings, which match the docs.
+        # Someone reported the issue on 2.6.1 on macos, but that was
+        # manually built, not the bundled one. Anyway, cast for safety.
+        major = int(major)
+        if major == 2:
+            self._socket = self._conn.fp._sock.fp._sock
+        else:
+            self._socket = self._conn.fp.raw
+            # our code that reads from the socket expects a method called recv.
+            # py3 socket.SocketIO uses the name read, so alias it.
+            self._socket.recv = self._socket.read
+
         self.connected = True
         if not self.starttime:
             self.starttime = time.time()
@@ -120,41 +161,60 @@
         returned by urllib.urlencode."""
         return None
 
-    def next(self):
-        """Return the next available tweet. This call is blocking!"""
+    def _update_rate(self):
+        rate_time = time.time() - self._rate_ts
+        if not self._rate_ts or rate_time > self.rate_period:
+            self.rate = self._rate_cnt / rate_time
+            self._rate_cnt = 0
+            self._rate_ts = time.time()
+
+    def __iter__(self):
+        buf = b""
         while True:
             try:
                 if not self.connected:
                     self._init_conn()
 
-                rate_time = time.time() - self._rate_ts
-                if not self._rate_ts or rate_time > self.rate_period:
-                    self.rate = self._rate_cnt / rate_time
-                    self._rate_cnt = 0
-                    self._rate_ts = time.time()
-
-                data = self._conn.readline()
-                if data == "":  # something is wrong
+                buf += self._socket.recv(8192)
+                if buf == b"":  # something is wrong
                     self.close()
                     raise ConnectionError("Got entry of length 0. Disconnected")
-                elif data.isspace():
+                elif buf.isspace():
+                    buf = b""
+                elif b"\r" not in buf: # not enough data yet. Loop around
                     continue
 
-                data = anyjson.deserialize(data)
-                if 'text' in data:
-                    self.count += 1
-                    self._rate_cnt += 1
-                return data
+                lines = buf.split(b"\r")
+                buf = lines[-1]
+                lines = lines[:-1]
 
-            except ValueError, e:
-                self.close()
-                raise ConnectionError("Got invalid data from twitter",
-                                      details=data)
+                for line in lines:
+                    if (self._raw_mode):
+                        tweet = line
+                    else:
+                        line = line.decode("utf8")
+                        try:
+                            tweet = anyjson.deserialize(line)
+                        except ValueError, e:
+                            self.close()
+                            raise ConnectionError("Got invalid data from twitter", details=line)
+
+                    if 'text' in tweet:
+                        self.count += 1
+                        self._rate_cnt += 1
+                    yield tweet
+
 
             except socket.error, e:
                 self.close()
                 raise ConnectionError("Server disconnected")
 
+
+    def next(self):
+        """Return the next available tweet. This call is blocking!"""
+        return self._iter.next()
+
+
     def close(self):
         """
         Close the connection to the streaming server.
@@ -165,11 +225,11 @@
 
 
 class SampleStream(BaseStream):
-    url = "http://stream.twitter.com/1/statuses/sample.json"
+    url = "https://stream.twitter.com/1/statuses/sample.json"
 
 
 class FilterStream(BaseStream):
-    url = "http://stream.twitter.com/1/statuses/filter.json"
+    url = "https://stream.twitter.com/1/statuses/filter.json"
 
     def __init__(self, username, password, follow=None, locations=None,
                  track=None, catchup=None, url=None):