1 import contextlib |
|
2 import threading |
|
3 import time |
|
4 |
|
5 from tweetstream import TweetStream, FollowStream, TrackStream, LocationStream |
|
6 from tweetstream import ConnectionError, AuthenticationError, SampleStream, FilterStream |
|
7 from tweepy.auth import BasicAuthHandler |
|
8 |
|
9 import pytest |
|
10 from pytest import raises |
|
11 slow = pytest.mark.slow |
|
12 |
|
13 from servercontext import test_server |
|
14 |
|
15 single_tweet = r"""{"in_reply_to_status_id":null,"in_reply_to_user_id":null,"favorited":false,"created_at":"Tue Jun 16 10:40:14 +0000 2009","in_reply_to_screen_name":null,"text":"record industry just keeps on amazing me: http:\/\/is.gd\/13lFo - $150k per song you've SHARED, not that somebody has actually DOWNLOADED.","user":{"notifications":null,"profile_background_tile":false,"followers_count":206,"time_zone":"Copenhagen","utc_offset":3600,"friends_count":191,"profile_background_color":"ffffff","profile_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_images\/250715794\/profile_normal.png","description":"Digital product developer, currently at Opera Software. My tweets are my opinions, not those of my employer.","verified_profile":false,"protected":false,"favourites_count":0,"profile_text_color":"3C3940","screen_name":"eiriksnilsen","name":"Eirik Stridsklev N.","following":null,"created_at":"Tue May 06 12:24:12 +0000 2008","profile_background_image_url":"http:\/\/s3.amazonaws.com\/twitter_production\/profile_background_images\/10531192\/160x600opera15.gif","profile_link_color":"0099B9","profile_sidebar_fill_color":"95E8EC","url":"http:\/\/www.stridsklev-nilsen.no\/eirik","id":14672543,"statuses_count":506,"profile_sidebar_border_color":"5ED4DC","location":"Oslo, Norway"},"id":2190767504,"truncated":false,"source":"<a href=\"http:\/\/widgets.opera.com\/widget\/7206\">Twitter Opera widget<\/a>"}""" + "\r" |
|
16 |
|
17 |
|
18 def parameterized(funcarglist): |
|
19 def wrapper(function): |
|
20 function.funcarglist = funcarglist |
|
21 return function |
|
22 return wrapper |
|
23 |
|
24 def pytest_generate_tests(metafunc): |
|
25 for funcargs in getattr(metafunc.function, 'funcarglist', ()): |
|
26 metafunc.addcall(funcargs=funcargs) |
|
27 |
|
28 |
|
29 streamtypes = [ |
|
30 dict(cls=TweetStream, args=[], kwargs=dict()), |
|
31 dict(cls=SampleStream, args=[], kwargs=dict()), |
|
32 dict(cls=FilterStream, args=[], kwargs=dict(track=("test",))), |
|
33 dict(cls=FollowStream, args=[[1, 2, 3]], kwargs=dict()), |
|
34 dict(cls=TrackStream, args=["opera"], kwargs=dict()), |
|
35 dict(cls=LocationStream, args=["123,4321"], kwargs=dict()) |
|
36 ] |
|
37 |
|
38 |
|
39 @parameterized(streamtypes) |
|
40 def test_bad_auth(cls, args, kwargs): |
|
41 """Test that the proper exception is raised when the user could not be |
|
42 authenticated""" |
|
43 def auth_denied(request): |
|
44 request.send_error(401) |
|
45 |
|
46 with raises(AuthenticationError): |
|
47 with test_server(handler=auth_denied, methods=("post", "get"), port="random") as server: |
|
48 auth = BasicAuthHandler("user", "passwd") |
|
49 stream = cls(auth, *args, url=server.baseurl) |
|
50 for e in stream: pass |
|
51 |
|
52 |
|
53 @parameterized(streamtypes) |
|
54 def test_404_url(cls, args, kwargs): |
|
55 """Test that the proper exception is raised when the stream URL can't be |
|
56 found""" |
|
57 def not_found(request): |
|
58 request.send_error(404) |
|
59 |
|
60 with raises(ConnectionError): |
|
61 with test_server(handler=not_found, methods=("post", "get"), port="random") as server: |
|
62 auth = BasicAuthHandler("user", "passwd") |
|
63 stream = cls(auth, *args, url=server.baseurl) |
|
64 for e in stream: pass |
|
65 |
|
66 |
|
67 @parameterized(streamtypes) |
|
68 def test_bad_content(cls, args, kwargs): |
|
69 """Test error handling if we are given invalid data""" |
|
70 def bad_content(request): |
|
71 for n in xrange(10): |
|
72 # what json we pass doesn't matter. It's not verifying the |
|
73 # strcuture, only checking that it's parsable |
|
74 yield "[1,2,3]\r" |
|
75 yield "[1,2, I need no stinking close brace\r" |
|
76 yield "[1,2,3]\r" |
|
77 |
|
78 |
|
79 with raises(ConnectionError): |
|
80 with test_server(handler=bad_content, methods=("post", "get"), port="random") as server: |
|
81 auth = BasicAuthHandler("user", "passwd") |
|
82 stream = cls(auth, *args, url=server.baseurl) |
|
83 for tweet in stream: |
|
84 pass |
|
85 |
|
86 |
|
87 @parameterized(streamtypes) |
|
88 def test_closed_connection(cls, args, kwargs): |
|
89 """Test error handling if server unexpectedly closes connection""" |
|
90 cnt = 1000 |
|
91 def bad_content(request): |
|
92 for n in xrange(cnt): |
|
93 # what json we pass doesn't matter. It's not verifying the |
|
94 # strcuture, only checking that it's parsable |
|
95 yield "[1,2,3]\r" |
|
96 |
|
97 with raises(ConnectionError): |
|
98 with test_server(handler=bad_content, methods=("post", "get"), port="random") as server: |
|
99 auth = BasicAuthHandler("foo", "bar") |
|
100 stream = cls(auth, *args, url=server.baseurl) |
|
101 for tweet in stream: |
|
102 pass |
|
103 |
|
104 |
|
105 @parameterized(streamtypes) |
|
106 def test_bad_host(cls, args, kwargs): |
|
107 """Test behaviour if we can't connect to the host""" |
|
108 with raises(ConnectionError): |
|
109 stream = cls("username", "passwd", *args, url="http://wedfwecfghhreewerewads.foo") |
|
110 stream.next() |
|
111 |
|
112 |
|
113 @parameterized(streamtypes) |
|
114 def smoke_test_receive_tweets(cls, args, kwargs): |
|
115 """Receive 100k tweets and disconnect (slow)""" |
|
116 total = 100000 |
|
117 |
|
118 def tweetsource(request): |
|
119 while True: |
|
120 yield single_tweet + "\n" |
|
121 |
|
122 with test_server(handler=tweetsource, methods=("post", "get"), port="random") as server: |
|
123 auth = BasicAuthHandler("foo", "bar") |
|
124 stream = cls(auth, *args, url=server.baseurl) |
|
125 for tweet in stream: |
|
126 if stream.count == total: |
|
127 break |
|
128 |
|
129 |
|
130 @parameterized(streamtypes) |
|
131 def test_keepalive(cls, args, kwargs): |
|
132 """Make sure we behave sanely when there are keepalive newlines in the |
|
133 data recevived from twitter""" |
|
134 def tweetsource(request): |
|
135 yield single_tweet+"\n" |
|
136 yield "\n" |
|
137 yield "\n" |
|
138 yield single_tweet+"\n" |
|
139 yield "\n" |
|
140 yield "\n" |
|
141 yield "\n" |
|
142 yield "\n" |
|
143 yield "\n" |
|
144 yield "\n" |
|
145 yield "\n" |
|
146 yield single_tweet+"\n" |
|
147 yield "\n" |
|
148 |
|
149 |
|
150 with test_server(handler=tweetsource, methods=("post", "get"), port="random") as server: |
|
151 auth = BasicAuthHandler("foo", "bar") |
|
152 stream = cls(auth, *args, url=server.baseurl) |
|
153 try: |
|
154 for tweet in stream: |
|
155 pass |
|
156 except ConnectionError: |
|
157 assert stream.count == 3, "Got %s, wanted 3" % stream.count |
|
158 else: |
|
159 assert False, "Didn't handle keepalive" |
|
160 |
|
161 |
|
162 @slow |
|
163 @parameterized(streamtypes) |
|
164 def test_buffering(cls, args, kwargs): |
|
165 """Test if buffering stops data from being returned immediately. |
|
166 If there is some buffering in play that might mean data is only returned |
|
167 from the generator when the buffer is full. If buffer is bigger than a |
|
168 tweet, this will happen. Default buffer size in the part of socket lib |
|
169 that enables readline is 8k. Max tweet length is around 3k.""" |
|
170 |
|
171 def tweetsource(request): |
|
172 yield single_tweet+"\n" |
|
173 time.sleep(2) |
|
174 # need to yield a bunch here so we're sure we'll return from the |
|
175 # blocking call in case the buffering bug is present. |
|
176 for n in xrange(100): |
|
177 yield single_tweet+"\n" |
|
178 |
|
179 |
|
180 with test_server(handler=tweetsource, methods=("post", "get"), port="random") as server: |
|
181 auth = BasicAuthHandler("foo", "bar") |
|
182 stream = cls(auth, *args, url=server.baseurl) |
|
183 start = time.time() |
|
184 stream.next() |
|
185 first = time.time() |
|
186 diff = first - start |
|
187 assert diff < 1, "Getting first tweet took more than a second!" |
|
188 |
|