script/lib/tweetstream/tweetstream/__init__.py
changeset 12 4daf47fcf792
child 13 79b6e132e3d7
child 15 5d552b6a0e55
equal deleted inserted replaced
11:54d7f1486ac4 12:4daf47fcf792
       
     1 """
       
     2 Simple Twitter streaming API access
       
     3 """
       
     4 __version__ = "0.3.5"
       
     5 __author__ = "Rune Halvorsen <runefh@gmail.com>"
       
     6 __homepage__ = "http://bitbucket.org/runeh/tweetstream/"
       
     7 __docformat__ = "restructuredtext"
       
     8 
       
     9 import urllib
       
    10 import urllib2
       
    11 import socket
       
    12 import time
       
    13 import anyjson
       
    14 
       
    15 
       
    16 """
       
    17  .. data:: URLS
       
    18 
       
    19      Mapping between twitter endpoint names and URLs.
       
    20 
       
    21  .. data:: USER_AGENT
       
    22 
       
    23      The default user agent string for stream objects
       
    24 
       
    25 """
       
    26 
       
    27 URLS = {"firehose": "http://stream.twitter.com/1/statuses/firehose.json",
       
    28         "sample": "http://stream.twitter.com/1/statuses/sample.json",
       
    29         "follow": "http://stream.twitter.com/1/statuses/filter.json",
       
    30         "track": "http://stream.twitter.com/1/statuses/filter.json"}
       
    31 
       
    32 USER_AGENT = "TweetStream %s" % __version__
       
    33 
       
    34 
       
    35 class TweetStreamError(Exception):
       
    36     """Base class for all tweetstream errors"""
       
    37     pass
       
    38 
       
    39 class AuthenticationError(TweetStreamError):
       
    40     """Exception raised if the username/password is not accepted
       
    41     """
       
    42     pass
       
    43 
       
    44 
       
    45 class ConnectionError(TweetStreamError):
       
    46     """Raised when there are network problems. This means when there are
       
    47     dns errors, network errors, twitter issues"""
       
    48 
       
    49     def __init__(self, reason, details=None):
       
    50         self.reason = reason
       
    51         self.details = details
       
    52 
       
    53     def __str__(self):
       
    54         return '<ConnectionError %s>' % self.reason
       
    55 
       
    56 
       
    57 class TweetStream(object):
       
    58     """A network connection to Twitters streaming API
       
    59 
       
    60     :param username: Twitter username for the account accessing the API.
       
    61     :param password: Twitter password for the account accessing the API.
       
    62 
       
    63     :keyword url: URL to connect to. This can be either an endopoint name,
       
    64      such as "sample", or a full URL. By default, the public "sample" url
       
    65      is used. All known endpoints are defined in the :URLS: attribute
       
    66 
       
    67     .. attribute:: connected
       
    68 
       
    69         True if the object is currently connected to the stream.
       
    70 
       
    71     .. attribute:: url
       
    72 
       
    73         The URL to which the object is connected
       
    74 
       
    75     .. attribute:: starttime
       
    76 
       
    77         The timestamp, in seconds since the epoch, the object connected to the
       
    78         streaming api.
       
    79 
       
    80     .. attribute:: count
       
    81 
       
    82         The number of tweets that have been returned by the object.
       
    83 
       
    84     .. attribute:: rate
       
    85 
       
    86         The rate at which tweets have been returned from the object as a
       
    87         float. see also :attr: `rate_period`.
       
    88 
       
    89     .. attribute:: rate_period
       
    90 
       
    91         The ammount of time to sample tweets to calculate tweet rate. By
       
    92         default 10 seconds. Changes to this attribute will not be reflected
       
    93         until the next time the rate is calculated. The rate of tweets vary
       
    94         with time of day etc. so it's usefull to set this to something
       
    95         sensible.
       
    96 
       
    97     .. attribute:: user_agent
       
    98 
       
    99         User agent string that will be included in the request. NOTE: This can
       
   100         not be changed after the connection has been made. This property must
       
   101         thus be set before accessing the iterator. The default is set in
       
   102         :attr: `USER_AGENT`.
       
   103 """
       
   104 
       
   105     def __init__(self, username, password, url="sample"):
       
   106         self._conn = None
       
   107         self._rate_ts = None
       
   108         self._rate_cnt = 0
       
   109         self._username = username
       
   110         self._password = password
       
   111 
       
   112         self.rate_period = 10 # in seconds
       
   113         self.connected = False
       
   114         self.starttime = None
       
   115         self.count = 0
       
   116         self.rate = 0
       
   117         self.user_agent = USER_AGENT
       
   118         self.url = URLS.get(url, url)
       
   119 
       
   120     def __iter__(self):
       
   121         return self
       
   122 
       
   123     def __enter__(self):
       
   124         return self
       
   125 
       
   126     def __exit__(self, *params):
       
   127         self.close()
       
   128         return False
       
   129 
       
   130     def _init_conn(self):
       
   131         """Open the connection to the twitter server"""
       
   132         headers = {'User-Agent': self.user_agent}
       
   133         req = urllib2.Request(self.url, self._get_post_data(), headers)
       
   134 
       
   135         password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
       
   136         password_mgr.add_password(None, self.url, self._username,
       
   137                                   self._password)
       
   138         handler = urllib2.HTTPBasicAuthHandler(password_mgr)
       
   139         opener = urllib2.build_opener(handler)
       
   140 
       
   141         try:
       
   142             self._conn = opener.open(req)
       
   143         except urllib2.HTTPError, exception:
       
   144             if exception.code == 401:
       
   145                 raise AuthenticationError("Access denied")
       
   146             elif exception.code == 404:
       
   147                 raise ConnectionError("URL not found: %s" % self.url)
       
   148             else: # re raise. No idea what would cause this, so want to know
       
   149                 raise
       
   150         except urllib2.URLError, exception:
       
   151             raise ConnectionError(exception.reason)
       
   152 
       
   153         self.connected = True
       
   154         if not self.starttime:
       
   155             self.starttime = time.time()
       
   156         if not self._rate_ts:
       
   157             self._rate_ts = time.time()
       
   158 
       
   159     def _get_post_data(self):
       
   160         """Subclasses that need to add post data to the request can override
       
   161         this method and return post data. The data should be in the format
       
   162         returned by urllib.urlencode."""
       
   163         return None
       
   164 
       
   165     def next(self):
       
   166         """Return the next available tweet. This call is blocking!"""
       
   167         while True:
       
   168             try:
       
   169                 if not self.connected:
       
   170                     self._init_conn()
       
   171 
       
   172                 rate_time = time.time() - self._rate_ts
       
   173                 if not self._rate_ts or rate_time > self.rate_period:
       
   174                     self.rate = self._rate_cnt / rate_time
       
   175                     self._rate_cnt = 0
       
   176                     self._rate_ts = time.time()
       
   177 
       
   178                 data = self._conn.readline()
       
   179                 if data == "": # something is wrong
       
   180                     self.close()
       
   181                     raise ConnectionError("Got entry of length 0. Disconnected")
       
   182                 elif data.isspace():
       
   183                     continue
       
   184 
       
   185                 data = anyjson.deserialize(data)
       
   186                 self.count += 1
       
   187                 self._rate_cnt += 1
       
   188                 return data
       
   189 
       
   190             except ValueError, e:
       
   191                 self.close()
       
   192                 raise ConnectionError("Got invalid data from twitter", details=data)
       
   193 
       
   194             except socket.error, e:
       
   195                 self.close()
       
   196                 raise ConnectionError("Server disconnected")
       
   197 
       
   198 
       
   199     def close(self):
       
   200         """
       
   201         Close the connection to the streaming server.
       
   202         """
       
   203         self.connected = False
       
   204         if self._conn:
       
   205             self._conn.close()
       
   206 
       
   207 
       
   208 class ReconnectingTweetStream(TweetStream):
       
   209     """TweetStream class that automatically tries to reconnect if the
       
   210     connecting goes down. Reconnecting, and waiting for reconnecting, is
       
   211     blocking.
       
   212 
       
   213     :param username: See :TweetStream:
       
   214 
       
   215     :param password: See :TweetStream:
       
   216 
       
   217     :keyword url: See :TweetStream:
       
   218 
       
   219     :keyword reconnects: Number of reconnects before a ConnectionError is
       
   220         raised. Default is 3
       
   221 
       
   222     :error_cb: Optional callable that will be called just before trying to
       
   223         reconnect. The callback will be called with a single argument, the
       
   224         exception that caused the reconnect attempt. Default is None
       
   225 
       
   226     :retry_wait: Time to wait before reconnecting in seconds. Default is 5
       
   227 
       
   228     """
       
   229 
       
   230     def __init__(self, username, password, url="sample",
       
   231                  reconnects=3, error_cb=None, retry_wait=5):
       
   232         self.max_reconnects = reconnects
       
   233         self.retry_wait = retry_wait
       
   234         self._reconnects = 0
       
   235         self._error_cb = error_cb
       
   236         TweetStream.__init__(self, username, password, url=url)
       
   237 
       
   238     def next(self):
       
   239         while True:
       
   240             try:
       
   241                 return TweetStream.next(self)
       
   242             except ConnectionError, e:
       
   243                 self._reconnects += 1
       
   244                 if self._reconnects > self.max_reconnects:
       
   245                     raise ConnectionError("Too many retries")
       
   246 
       
   247                 # Note: error_cb is not called on the last error since we
       
   248                 # raise a ConnectionError instead
       
   249                 if  callable(self._error_cb):
       
   250                     self._error_cb(e)
       
   251 
       
   252                 time.sleep(self.retry_wait)
       
   253         # Don't listen to auth error, since we can't reasonably reconnect
       
   254         # when we get one.
       
   255 
       
   256 class FollowStream(TweetStream):
       
   257     """Stream class for getting tweets from followers.
       
   258 
       
   259         :param user: See TweetStream
       
   260 
       
   261         :param password: See TweetStream
       
   262 
       
   263         :param followees: Iterable containing user IDs to follow.
       
   264           ***Note:*** the user id in question is the numeric ID twitter uses,
       
   265           not the normal username.
       
   266 
       
   267         :keyword url: Like the url argument to TweetStream, except default
       
   268           value is the "follow" endpoint.
       
   269     """
       
   270 
       
   271     def __init__(self, user, password, followees, url="follow", **kwargs):
       
   272         self.followees = followees
       
   273         TweetStream.__init__(self, user, password, url=url, **kwargs)
       
   274 
       
   275     def _get_post_data(self):
       
   276         return urllib.urlencode({"follow": ",".join(map(str, self.followees))})
       
   277 
       
   278 
       
   279 class TrackStream(TweetStream):
       
   280     """Stream class for getting tweets relevant to keywords.
       
   281 
       
   282         :param user: See TweetStream
       
   283 
       
   284         :param password: See TweetStream
       
   285 
       
   286         :param keywords: Iterable containing keywords to look for
       
   287 
       
   288         :keyword url: Like the url argument to TweetStream, except default
       
   289           value is the "track" endpoint.
       
   290     """
       
   291 
       
   292     def __init__(self, user, password, keywords, url="track", **kwargs):
       
   293         self.keywords = keywords
       
   294         TweetStream.__init__(self, user, password, url=url, **kwargs)
       
   295 
       
   296     def _get_post_data(self):
       
   297         return urllib.urlencode({"track": ",".join(self.keywords)})