|
1 """ |
|
2 Simple Twitter streaming API access |
|
3 """ |
|
4 __version__ = "0.3.5" |
|
5 __author__ = "Rune Halvorsen <runefh@gmail.com>" |
|
6 __homepage__ = "http://bitbucket.org/runeh/tweetstream/" |
|
7 __docformat__ = "restructuredtext" |
|
8 |
|
9 import urllib |
|
10 import urllib2 |
|
11 import socket |
|
12 import time |
|
13 import anyjson |
|
14 |
|
15 |
|
16 """ |
|
17 .. data:: URLS |
|
18 |
|
19 Mapping between twitter endpoint names and URLs. |
|
20 |
|
21 .. data:: USER_AGENT |
|
22 |
|
23 The default user agent string for stream objects |
|
24 |
|
25 """ |
|
26 |
|
27 URLS = {"firehose": "http://stream.twitter.com/1/statuses/firehose.json", |
|
28 "sample": "http://stream.twitter.com/1/statuses/sample.json", |
|
29 "follow": "http://stream.twitter.com/1/statuses/filter.json", |
|
30 "track": "http://stream.twitter.com/1/statuses/filter.json"} |
|
31 |
|
32 USER_AGENT = "TweetStream %s" % __version__ |
|
33 |
|
34 |
|
35 class TweetStreamError(Exception): |
|
36 """Base class for all tweetstream errors""" |
|
37 pass |
|
38 |
|
39 class AuthenticationError(TweetStreamError): |
|
40 """Exception raised if the username/password is not accepted |
|
41 """ |
|
42 pass |
|
43 |
|
44 |
|
45 class ConnectionError(TweetStreamError): |
|
46 """Raised when there are network problems. This means when there are |
|
47 dns errors, network errors, twitter issues""" |
|
48 |
|
49 def __init__(self, reason, details=None): |
|
50 self.reason = reason |
|
51 self.details = details |
|
52 |
|
53 def __str__(self): |
|
54 return '<ConnectionError %s>' % self.reason |
|
55 |
|
56 |
|
57 class TweetStream(object): |
|
58 """A network connection to Twitters streaming API |
|
59 |
|
60 :param username: Twitter username for the account accessing the API. |
|
61 :param password: Twitter password for the account accessing the API. |
|
62 |
|
63 :keyword url: URL to connect to. This can be either an endopoint name, |
|
64 such as "sample", or a full URL. By default, the public "sample" url |
|
65 is used. All known endpoints are defined in the :URLS: attribute |
|
66 |
|
67 .. attribute:: connected |
|
68 |
|
69 True if the object is currently connected to the stream. |
|
70 |
|
71 .. attribute:: url |
|
72 |
|
73 The URL to which the object is connected |
|
74 |
|
75 .. attribute:: starttime |
|
76 |
|
77 The timestamp, in seconds since the epoch, the object connected to the |
|
78 streaming api. |
|
79 |
|
80 .. attribute:: count |
|
81 |
|
82 The number of tweets that have been returned by the object. |
|
83 |
|
84 .. attribute:: rate |
|
85 |
|
86 The rate at which tweets have been returned from the object as a |
|
87 float. see also :attr: `rate_period`. |
|
88 |
|
89 .. attribute:: rate_period |
|
90 |
|
91 The ammount of time to sample tweets to calculate tweet rate. By |
|
92 default 10 seconds. Changes to this attribute will not be reflected |
|
93 until the next time the rate is calculated. The rate of tweets vary |
|
94 with time of day etc. so it's usefull to set this to something |
|
95 sensible. |
|
96 |
|
97 .. attribute:: user_agent |
|
98 |
|
99 User agent string that will be included in the request. NOTE: This can |
|
100 not be changed after the connection has been made. This property must |
|
101 thus be set before accessing the iterator. The default is set in |
|
102 :attr: `USER_AGENT`. |
|
103 """ |
|
104 |
|
105 def __init__(self, username, password, url="sample"): |
|
106 self._conn = None |
|
107 self._rate_ts = None |
|
108 self._rate_cnt = 0 |
|
109 self._username = username |
|
110 self._password = password |
|
111 |
|
112 self.rate_period = 10 # in seconds |
|
113 self.connected = False |
|
114 self.starttime = None |
|
115 self.count = 0 |
|
116 self.rate = 0 |
|
117 self.user_agent = USER_AGENT |
|
118 self.url = URLS.get(url, url) |
|
119 |
|
120 def __iter__(self): |
|
121 return self |
|
122 |
|
123 def __enter__(self): |
|
124 return self |
|
125 |
|
126 def __exit__(self, *params): |
|
127 self.close() |
|
128 return False |
|
129 |
|
130 def _init_conn(self): |
|
131 """Open the connection to the twitter server""" |
|
132 headers = {'User-Agent': self.user_agent} |
|
133 req = urllib2.Request(self.url, self._get_post_data(), headers) |
|
134 |
|
135 password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() |
|
136 password_mgr.add_password(None, self.url, self._username, |
|
137 self._password) |
|
138 handler = urllib2.HTTPBasicAuthHandler(password_mgr) |
|
139 opener = urllib2.build_opener(handler) |
|
140 |
|
141 try: |
|
142 self._conn = opener.open(req) |
|
143 except urllib2.HTTPError, exception: |
|
144 if exception.code == 401: |
|
145 raise AuthenticationError("Access denied") |
|
146 elif exception.code == 404: |
|
147 raise ConnectionError("URL not found: %s" % self.url) |
|
148 else: # re raise. No idea what would cause this, so want to know |
|
149 raise |
|
150 except urllib2.URLError, exception: |
|
151 raise ConnectionError(exception.reason) |
|
152 |
|
153 self.connected = True |
|
154 if not self.starttime: |
|
155 self.starttime = time.time() |
|
156 if not self._rate_ts: |
|
157 self._rate_ts = time.time() |
|
158 |
|
159 def _get_post_data(self): |
|
160 """Subclasses that need to add post data to the request can override |
|
161 this method and return post data. The data should be in the format |
|
162 returned by urllib.urlencode.""" |
|
163 return None |
|
164 |
|
165 def next(self): |
|
166 """Return the next available tweet. This call is blocking!""" |
|
167 while True: |
|
168 try: |
|
169 if not self.connected: |
|
170 self._init_conn() |
|
171 |
|
172 rate_time = time.time() - self._rate_ts |
|
173 if not self._rate_ts or rate_time > self.rate_period: |
|
174 self.rate = self._rate_cnt / rate_time |
|
175 self._rate_cnt = 0 |
|
176 self._rate_ts = time.time() |
|
177 |
|
178 data = self._conn.readline() |
|
179 if data == "": # something is wrong |
|
180 self.close() |
|
181 raise ConnectionError("Got entry of length 0. Disconnected") |
|
182 elif data.isspace(): |
|
183 continue |
|
184 |
|
185 data = anyjson.deserialize(data) |
|
186 self.count += 1 |
|
187 self._rate_cnt += 1 |
|
188 return data |
|
189 |
|
190 except ValueError, e: |
|
191 self.close() |
|
192 raise ConnectionError("Got invalid data from twitter", details=data) |
|
193 |
|
194 except socket.error, e: |
|
195 self.close() |
|
196 raise ConnectionError("Server disconnected") |
|
197 |
|
198 |
|
199 def close(self): |
|
200 """ |
|
201 Close the connection to the streaming server. |
|
202 """ |
|
203 self.connected = False |
|
204 if self._conn: |
|
205 self._conn.close() |
|
206 |
|
207 |
|
208 class ReconnectingTweetStream(TweetStream): |
|
209 """TweetStream class that automatically tries to reconnect if the |
|
210 connecting goes down. Reconnecting, and waiting for reconnecting, is |
|
211 blocking. |
|
212 |
|
213 :param username: See :TweetStream: |
|
214 |
|
215 :param password: See :TweetStream: |
|
216 |
|
217 :keyword url: See :TweetStream: |
|
218 |
|
219 :keyword reconnects: Number of reconnects before a ConnectionError is |
|
220 raised. Default is 3 |
|
221 |
|
222 :error_cb: Optional callable that will be called just before trying to |
|
223 reconnect. The callback will be called with a single argument, the |
|
224 exception that caused the reconnect attempt. Default is None |
|
225 |
|
226 :retry_wait: Time to wait before reconnecting in seconds. Default is 5 |
|
227 |
|
228 """ |
|
229 |
|
230 def __init__(self, username, password, url="sample", |
|
231 reconnects=3, error_cb=None, retry_wait=5): |
|
232 self.max_reconnects = reconnects |
|
233 self.retry_wait = retry_wait |
|
234 self._reconnects = 0 |
|
235 self._error_cb = error_cb |
|
236 TweetStream.__init__(self, username, password, url=url) |
|
237 |
|
238 def next(self): |
|
239 while True: |
|
240 try: |
|
241 return TweetStream.next(self) |
|
242 except ConnectionError, e: |
|
243 self._reconnects += 1 |
|
244 if self._reconnects > self.max_reconnects: |
|
245 raise ConnectionError("Too many retries") |
|
246 |
|
247 # Note: error_cb is not called on the last error since we |
|
248 # raise a ConnectionError instead |
|
249 if callable(self._error_cb): |
|
250 self._error_cb(e) |
|
251 |
|
252 time.sleep(self.retry_wait) |
|
253 # Don't listen to auth error, since we can't reasonably reconnect |
|
254 # when we get one. |
|
255 |
|
256 class FollowStream(TweetStream): |
|
257 """Stream class for getting tweets from followers. |
|
258 |
|
259 :param user: See TweetStream |
|
260 |
|
261 :param password: See TweetStream |
|
262 |
|
263 :param followees: Iterable containing user IDs to follow. |
|
264 ***Note:*** the user id in question is the numeric ID twitter uses, |
|
265 not the normal username. |
|
266 |
|
267 :keyword url: Like the url argument to TweetStream, except default |
|
268 value is the "follow" endpoint. |
|
269 """ |
|
270 |
|
271 def __init__(self, user, password, followees, url="follow", **kwargs): |
|
272 self.followees = followees |
|
273 TweetStream.__init__(self, user, password, url=url, **kwargs) |
|
274 |
|
275 def _get_post_data(self): |
|
276 return urllib.urlencode({"follow": ",".join(map(str, self.followees))}) |
|
277 |
|
278 |
|
279 class TrackStream(TweetStream): |
|
280 """Stream class for getting tweets relevant to keywords. |
|
281 |
|
282 :param user: See TweetStream |
|
283 |
|
284 :param password: See TweetStream |
|
285 |
|
286 :param keywords: Iterable containing keywords to look for |
|
287 |
|
288 :keyword url: Like the url argument to TweetStream, except default |
|
289 value is the "track" endpoint. |
|
290 """ |
|
291 |
|
292 def __init__(self, user, password, keywords, url="track", **kwargs): |
|
293 self.keywords = keywords |
|
294 TweetStream.__init__(self, user, password, url=url, **kwargs) |
|
295 |
|
296 def _get_post_data(self): |
|
297 return urllib.urlencode({"track": ",".join(self.keywords)}) |