script/lib/tweetstream/tests/test_tweetstream.py
author Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Mon, 20 Feb 2012 01:35:15 +0100
changeset 527 80e5b9543cac
parent 207 621fa6caec0c
parent 14 10e7a0c7c64f
permissions -rw-r--r--
Merge with 9d7218514a08006f925f138df88d770b29fae4cc

import contextlib
import threading
import time

from tweetstream import TweetStream, FollowStream, TrackStream, LocationStream
from tweetstream import ConnectionError, AuthenticationError, SampleStream, FilterStream
from tweepy.auth import BasicAuthHandler   

import pytest
from pytest import raises
slow = pytest.mark.slow

from servercontext import test_server

single_tweet = r"""{"in_reply_to_status_id":null,"in_reply_to_user_id":null,"favorited":false,"created_at":"Tue Jun 16 10:40:14 +0000 2009","in_reply_to_screen_name":null,"text":"record industry just keeps on amazing me: http:\/\/is.gd\/13lFo - $150k per song you've SHARED, not that somebody has actually DOWNLOADED.","user":{"notifications":null,"profile_background_tile":false,"followers_count":206,"time_zone":"Copenhagen","utc_offset":3600,"friends_count":191,"profile_background_color":"ffffff","profile_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_images\/250715794\/profile_normal.png","description":"Digital product developer, currently at Opera Software. My tweets are my opinions, not those of my employer.","verified_profile":false,"protected":false,"favourites_count":0,"profile_text_color":"3C3940","screen_name":"eiriksnilsen","name":"Eirik Stridsklev N.","following":null,"created_at":"Tue May 06 12:24:12 +0000 2008","profile_background_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_background_images\/10531192\/160x600opera15.gif","profile_link_color":"0099B9","profile_sidebar_fill_color":"95E8EC","url":"http:\/\/www.stridsklev-nilsen.no\/eirik","id":14672543,"statuses_count":506,"profile_sidebar_border_color":"5ED4DC","location":"Oslo, Norway"},"id":2190767504,"truncated":false,"source":"<a href=\"http:\/\/widgets.opera.com\/widget\/7206\">Twitter Opera widget<\/a>"}""" + "\r"


def parameterized(funcarglist):
    def wrapper(function):
        function.funcarglist = funcarglist
        return function
    return wrapper

def pytest_generate_tests(metafunc):
    for funcargs in getattr(metafunc.function, 'funcarglist', ()):
        metafunc.addcall(funcargs=funcargs)


streamtypes = [
    dict(cls=TweetStream, args=[], kwargs=dict()),
    dict(cls=SampleStream, args=[], kwargs=dict()),
    dict(cls=FilterStream, args=[], kwargs=dict(track=("test",))),
    dict(cls=FollowStream, args=[[1, 2, 3]], kwargs=dict()),
    dict(cls=TrackStream, args=["opera"], kwargs=dict()),
    dict(cls=LocationStream, args=["123,4321"], kwargs=dict())
]


@parameterized(streamtypes)
def test_bad_auth(cls, args, kwargs):
    """Test that the proper exception is raised when the user could not be
    authenticated"""
    def auth_denied(request):
        request.send_error(401)

    with raises(AuthenticationError):
        with test_server(handler=auth_denied, methods=("post", "get"), port="random") as server:
            auth = BasicAuthHandler("user", "passwd")
            stream = cls(auth, *args, url=server.baseurl)
            for e in stream: pass


@parameterized(streamtypes)
def test_404_url(cls, args, kwargs):
    """Test that the proper exception is raised when the stream URL can't be
    found"""
    def not_found(request):
        request.send_error(404)

    with raises(ConnectionError):
        with test_server(handler=not_found, methods=("post", "get"), port="random") as server:
            auth = BasicAuthHandler("user", "passwd")
            stream = cls(auth, *args, url=server.baseurl)
            for e in stream: pass


@parameterized(streamtypes)
def test_bad_content(cls, args, kwargs):
    """Test error handling if we are given invalid data"""
    def bad_content(request):
        for n in xrange(10):
            # what json we pass doesn't matter. It's not verifying the
            # strcuture, only checking that it's parsable
            yield "[1,2,3]\r"
        yield "[1,2, I need no stinking close brace\r"
        yield "[1,2,3]\r"


    with raises(ConnectionError):
        with test_server(handler=bad_content, methods=("post", "get"), port="random") as server:
            auth = BasicAuthHandler("user", "passwd")
            stream = cls(auth, *args, url=server.baseurl)
            for tweet in stream:
                pass


@parameterized(streamtypes)
def test_closed_connection(cls, args, kwargs):
    """Test error handling if server unexpectedly closes connection"""
    cnt = 1000
    def bad_content(request):
        for n in xrange(cnt):
            # what json we pass doesn't matter. It's not verifying the
            # strcuture, only checking that it's parsable
            yield "[1,2,3]\r"

    with raises(ConnectionError):
        with test_server(handler=bad_content, methods=("post", "get"), port="random") as server:
            auth = BasicAuthHandler("foo", "bar")
            stream = cls(auth, *args, url=server.baseurl)
            for tweet in stream:
                pass


@parameterized(streamtypes)
def test_bad_host(cls, args, kwargs):
    """Test behaviour if we can't connect to the host"""
    with raises(ConnectionError):
        stream = cls("username", "passwd", *args, url="http://wedfwecfghhreewerewads.foo")
        stream.next()


@parameterized(streamtypes)
def smoke_test_receive_tweets(cls, args, kwargs):
    """Receive 100k tweets and disconnect (slow)"""
    total = 100000

    def tweetsource(request):
        while True:
            yield single_tweet + "\n"

    with test_server(handler=tweetsource, methods=("post", "get"), port="random") as server:
        auth = BasicAuthHandler("foo", "bar")
        stream = cls(auth, *args, url=server.baseurl)
        for tweet in stream:
            if stream.count == total:
                break


@parameterized(streamtypes)
def test_keepalive(cls, args, kwargs):
    """Make sure we behave sanely when there are keepalive newlines in the
    data recevived from twitter"""
    def tweetsource(request):
        yield single_tweet+"\n"
        yield "\n"
        yield "\n"
        yield single_tweet+"\n"
        yield "\n"
        yield "\n"
        yield "\n"
        yield "\n"
        yield "\n"
        yield "\n"
        yield "\n"
        yield single_tweet+"\n"
        yield "\n"


    with test_server(handler=tweetsource, methods=("post", "get"), port="random") as server:
        auth = BasicAuthHandler("foo", "bar")
        stream = cls(auth, *args, url=server.baseurl)
        try:
            for tweet in stream:
                pass
        except ConnectionError:
            assert stream.count == 3, "Got %s, wanted 3" % stream.count
        else:
            assert False, "Didn't handle keepalive"


@slow
@parameterized(streamtypes)
def test_buffering(cls, args, kwargs):
    """Test if buffering stops data from being returned immediately.
    If there is some buffering in play that might mean data is only returned
    from the generator when the buffer is full. If buffer is bigger than a
    tweet, this will happen. Default buffer size in the part of socket lib
    that enables readline is 8k. Max tweet length is around 3k."""

    def tweetsource(request):
        yield single_tweet+"\n"
        time.sleep(2)
        # need to yield a bunch here so we're sure we'll return from the
        # blocking call in case the buffering bug is present.
        for n in xrange(100):
            yield single_tweet+"\n"


    with test_server(handler=tweetsource, methods=("post", "get"), port="random") as server:
        auth = BasicAuthHandler("foo", "bar")
        stream = cls(auth, *args, url=server.baseurl)
        start = time.time()
        stream.next()
        first = time.time()
        diff = first - start
        assert diff < 1, "Getting first tweet took more than a second!"