script/lib/tweetstream/tweetstream/streamclasses.py
changeset 207 621fa6caec0c
parent 13 79b6e132e3d7
child 209 b3091d5aa6cc
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/tweetstream/tweetstream/streamclasses.py	Fri Jul 01 18:59:13 2011 +0200
@@ -0,0 +1,186 @@
+from . import AuthenticationError, ConnectionError, USER_AGENT
+import anyjson
+import socket
+import time
+import urllib
+import urllib2
+socket._fileobject.default_bufsize = 0
+
+
+class BaseStream(object):
+    """A network connection to Twitters streaming API
+
+    :param username: Twitter username for the account accessing the API.
+    :param password: Twitter password for the account accessing the API.
+    :keyword count: Number of tweets from the past to get before switching to
+      live stream.
+    :keyword url: Endpoint URL for the object. Note: you should not
+      need to edit this. It's present to make testing easier.
+
+    .. attribute:: connected
+
+        True if the object is currently connected to the stream.
+
+    .. attribute:: url
+
+        The URL to which the object is connected
+
+    .. attribute:: starttime
+
+        The timestamp, in seconds since the epoch, the object connected to the
+        streaming api.
+
+    .. attribute:: count
+
+        The number of tweets that have been returned by the object.
+
+    .. attribute:: rate
+
+        The rate at which tweets have been returned from the object as a
+        float. see also :attr: `rate_period`.
+
+    .. attribute:: rate_period
+
+        The ammount of time to sample tweets to calculate tweet rate. By
+        default 10 seconds. Changes to this attribute will not be reflected
+        until the next time the rate is calculated. The rate of tweets vary
+        with time of day etc. so it's usefull to set this to something
+        sensible.
+
+    .. attribute:: user_agent
+
+        User agent string that will be included in the request. NOTE: This can
+        not be changed after the connection has been made. This property must
+        thus be set before accessing the iterator. The default is set in
+        :attr: `USER_AGENT`.
+    """
+
+    def __init__(self, auth, catchup=None, url=None):
+        self._conn = None
+        self._rate_ts = None
+        self._rate_cnt = 0
+        self._auth = auth
+        self._catchup_count = catchup
+
+        self.rate_period = 10  # in seconds
+        self.connected = False
+        self.starttime = None
+        self.count = 0
+        self.rate = 0
+        self.user_agent = USER_AGENT
+        if url: self.url = url
+
+    def __iter__(self):
+        return self
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *params):
+        self.close()
+        return False
+
+    def _init_conn(self):
+        """Open the connection to the twitter server"""
+        headers = {'User-Agent': self.user_agent}
+
+        postdata = self._get_post_data() or {}
+        if self._catchup_count:
+            postdata["count"] = self._catchup_count
+
+        poststring = urllib.urlencode(postdata) if postdata else None
+        
+        if self._auth:
+            self._auth.apply_auth(self.url, "POST", headers, postdata)
+                
+        req = urllib2.Request(self.url, poststring, headers)
+
+        try:
+            self._conn = urllib2.urlopen(req)
+        except urllib2.HTTPError, exception:
+            if exception.code == 401:
+                raise AuthenticationError("Access denied")
+            elif exception.code == 404:
+                raise ConnectionError("URL not found: %s" % self.url)
+            else:  # re raise. No idea what would cause this, so want to know
+                raise
+        except urllib2.URLError, exception:
+            raise ConnectionError(exception.reason)
+
+        self.connected = True
+        if not self.starttime:
+            self.starttime = time.time()
+        if not self._rate_ts:
+            self._rate_ts = time.time()
+
+    def _get_post_data(self):
+        """Subclasses that need to add post data to the request can override
+        this method and return post data. The data should be in the format
+        returned by urllib.urlencode."""
+        return None
+
+    def next(self):
+        """Return the next available tweet. This call is blocking!"""
+        while True:
+            try:
+                if not self.connected:
+                    self._init_conn()
+
+                rate_time = time.time() - self._rate_ts
+                if not self._rate_ts or rate_time > self.rate_period:
+                    self.rate = self._rate_cnt / rate_time
+                    self._rate_cnt = 0
+                    self._rate_ts = time.time()
+
+                data = self._conn.readline()
+                if data == "":  # something is wrong
+                    self.close()
+                    raise ConnectionError("Got entry of length 0. Disconnected")
+                elif data.isspace():
+                    continue
+
+                data = anyjson.deserialize(data)
+                if 'text' in data:
+                    self.count += 1
+                    self._rate_cnt += 1
+                return data
+
+            except ValueError, e:
+                self.close()
+                raise ConnectionError("Got invalid data from twitter",
+                                      details=data)
+
+            except socket.error, e:
+                self.close()
+                raise ConnectionError("Server disconnected")
+
+    def close(self):
+        """
+        Close the connection to the streaming server.
+        """
+        self.connected = False
+        if self._conn:
+            self._conn.close()
+
+
+class SampleStream(BaseStream):
+    url = "http://stream.twitter.com/1/statuses/sample.json"
+
+
+class FilterStream(BaseStream):
+    url = "http://stream.twitter.com/1/statuses/filter.json"
+
+    def __init__(self, auth, follow=None, locations=None,
+                 track=None, catchup=None, url=None):
+        self._follow = follow
+        self._locations = locations
+        self._track = track
+        # remove follow, locations, track
+        BaseStream.__init__(self, auth, url=url)
+
+    def _get_post_data(self):
+        postdata = {}
+        if self._follow: postdata["follow"] = ",".join([str(e) for e in self._follow])
+        if self._locations: postdata["locations"] = ",".join(self._locations)
+        if self._track: postdata["track"] = ",".join(self._track)
+        return postdata