add oauth authentication to tweetstream
authorYves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Thu, 20 Jan 2011 10:44:04 +0100
changeset 15 5d552b6a0e55
parent 12 4daf47fcf792
child 16 6d391ad4fd6a
add oauth authentication to tweetstream
.hgignore
script/lib/iri_tweet/export_tweet_db.py
script/lib/iri_tweet/utils.py
script/lib/tweetstream/tests/test_tweetstream.py
script/lib/tweetstream/tweetstream/__init__.py
script/lib/tweetstream/tweetstream/auth.py
script/lib/tweetstream/tweetstream/oauth.py
script/rest/search_enmi.py
script/stream/recorder_tweetstream.py
--- a/.hgignore	Tue Jan 18 18:25:18 2011 +0100
+++ b/.hgignore	Thu Jan 20 10:44:04 2011 +0100
@@ -6,4 +6,6 @@
 syntax: regexp
 ^script/rest/virtualenv$
 syntax: regexp
-^script/stream/streamwatcher\.py$
\ No newline at end of file
+^script/stream/streamwatcher\.py$
+syntax: regexp
+^script/virtualenv/res/twitter-1\.4\.2$
\ No newline at end of file
--- a/script/lib/iri_tweet/export_tweet_db.py	Tue Jan 18 18:25:18 2011 +0100
+++ b/script/lib/iri_tweet/export_tweet_db.py	Thu Jan 20 10:44:04 2011 +0100
@@ -14,6 +14,9 @@
     
     parser = OptionParser()
 
+    parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
+                      help="Token file name")
+
     set_logging_options(parser)
 
     return parser.parse_args()
@@ -33,7 +36,7 @@
             fields_mapping = {}
             for i,res in enumerate(curs_in.execute("select json from tweet_tweet;")):
                 logging.debug("main loop %d : %s" % (i, res[0]))
-                processor = TwitterProcessor(eval(res[0]), res[0], session)
+                processor = TwitterProcessor(eval(res[0]), res[0], session, options.token_filename)
                 processor.process()
                 session.commit()
             logging.debug("main : %d tweet processed" % (i+1))
--- a/script/lib/iri_tweet/utils.py	Tue Jan 18 18:25:18 2011 +0100
+++ b/script/lib/iri_tweet/utils.py	Thu Jan 20 10:44:04 2011 +0100
@@ -3,16 +3,17 @@
 import email.utils
 import json
 import logging
+import os.path
 import sys
 import twitter
+import twitter.oauth
+import twitter.oauth_dance
 import twitter_text
-import os.path
-import twitter.oauth
 
 
 def get_oauth_token(token_file_path=None):
     
-    if token_file_path and os.path.file_exists(token_file_path):
+    if token_file_path and os.path.exists(token_file_path):
         logging.debug("reading token from file %s" % token_file_path)
         return twitter.oauth.read_token_file(token_file_path)
         #read access token info from path
@@ -20,7 +21,7 @@
     if 'ACCESS_TOKEN_KEY' in dict() and 'ACCESS_TOKEN_SECRET' in dict() and ACCESS_TOKEN_KEY and ACCESS_TOKEN_SECRET:
         return ACCESS_TOKEN_KEY,ACCESS_TOKEN_SECRET
     
-    return twitter.oauth_dance.oauth_dance(APPLICATION_NAME, CONSUMER_KEY, CONSUMER_SECRET, token_filename)
+    return twitter.oauth_dance.oauth_dance(APPLICATION_NAME, CONSUMER_KEY, CONSUMER_SECRET, token_file_path)
 
 def parse_date(date_str):
     ts = email.utils.parsedate_tz(date_str)
@@ -68,7 +69,7 @@
 
 class TwitterProcessor(object):
     
-    def __init__(self, json_dict, json_txt, session):
+    def __init__(self, json_dict, json_txt, session, token_filename=None):
 
         if json_dict is None and json_txt is None:
             raise TwitterProcessorException("No json")
@@ -87,6 +88,7 @@
             raise TwitterProcessorException("No id in json")
         
         self.session = session
+        self.token_filename = token_filename
 
     def __get_user(self, user_dict):
         logging.debug("Get user : " + repr(user_dict))
@@ -108,8 +110,8 @@
         user_created_at = user_dict.get("created_at", None)
         
         if user_created_at is None:
-            acess_token_key, access_token_secret = get_oauth_token()
-            t = twitter.Twitter(auth=twitter.OAuth(token_key, token_secret, CONSUMER_KEY, CONSUMER_SECRET))
+            acess_token_key, access_token_secret = get_oauth_token(self.token_filename)
+            t = twitter.Twitter(auth=twitter.OAuth(acess_token_key, access_token_secret, CONSUMER_KEY, CONSUMER_SECRET))
             try:
                 if user_id:
                     user_dict = t.users.show(user_id=user_id)
--- a/script/lib/tweetstream/tests/test_tweetstream.py	Tue Jan 18 18:25:18 2011 +0100
+++ b/script/lib/tweetstream/tests/test_tweetstream.py	Thu Jan 20 10:44:04 2011 +0100
@@ -6,6 +6,7 @@
 from nose.tools import assert_raises
 from tweetstream import TweetStream, FollowStream, TrackStream
 from tweetstream import ConnectionError, AuthenticationError
+from tweetstream import auth
 
 from servercontext import test_server
 
@@ -20,13 +21,13 @@
 
     with test_server(handler=auth_denied, methods=("post", "get"),
                      port="random") as server:
-        stream = TweetStream("foo", "bar", url=server.baseurl)
+        stream = TweetStream(auth.BasicAuthHandler("foo", "bar"), url=server.baseurl)
         assert_raises(AuthenticationError, stream.next)
 
-        stream = FollowStream("foo", "bar", [1, 2, 3], url=server.baseurl)
+        stream = FollowStream(auth.BasicAuthHandler("foo", "bar"), [1, 2, 3], url=server.baseurl)
         assert_raises(AuthenticationError, stream.next)
 
-        stream = TrackStream("foo", "bar", ["opera"], url=server.baseurl)
+        stream = TrackStream(auth.BasicAuthHandler("foo", "bar"), ["opera"], url=server.baseurl)
         assert_raises(AuthenticationError, stream.next)
 
 
@@ -38,13 +39,13 @@
 
     with test_server(handler=not_found, methods=("post", "get"),
                      port="random") as server:
-        stream = TweetStream("foo", "bar", url=server.baseurl)
+        stream = TweetStream(auth.BasicAuthHandler("foo", "bar"), url=server.baseurl)
         assert_raises(ConnectionError, stream.next)
 
-        stream = FollowStream("foo", "bar", [1, 2, 3], url=server.baseurl)
+        stream = FollowStream(auth.BasicAuthHandler("foo", "bar"), [1, 2, 3], url=server.baseurl)
         assert_raises(ConnectionError, stream.next)
 
-        stream = TrackStream("foo", "bar", ["opera"], url=server.baseurl)
+        stream = TrackStream(auth.BasicAuthHandler("foo", "bar"), ["opera"], url=server.baseurl)
         assert_raises(ConnectionError, stream.next)
 
 
@@ -61,7 +62,7 @@
     def do_test(klass, *args):
         with test_server(handler=bad_content, methods=("post", "get"),
                          port="random") as server:
-            stream = klass("foo", "bar", *args, url=server.baseurl)
+            stream = klass(auth.BasicAuthHandler("foo", "bar"), *args, url=server.baseurl)
             for tweet in stream:
                 pass
 
@@ -82,7 +83,7 @@
     def do_test(klass, *args):
         with test_server(handler=bad_content, methods=("post", "get"),
                          port="random") as server:
-            stream = klass("foo", "bar", *args, url=server.baseurl)
+            stream = klass(auth.BasicAuthHandler("foo", "bar"), *args, url=server.baseurl)
             for tweet in stream:
                 pass
 
@@ -93,13 +94,13 @@
 
 def test_bad_host():
     """Test behaviour if we can't connect to the host"""
-    stream = TweetStream("foo", "bar", url="http://bad.egewdvsdswefdsf.com/")
+    stream = TweetStream(auth.BasicAuthHandler("foo", "bar"), url="http://bad.egewdvsdswefdsf.com/")
     assert_raises(ConnectionError, stream.next)
 
-    stream = FollowStream("foo", "bar", [1, 2, 3], url="http://zegwefdsf.com/")
+    stream = FollowStream(auth.BasicAuthHandler("foo", "bar"), [1, 2, 3], url="http://zegwefdsf.com/")
     assert_raises(ConnectionError, stream.next)
 
-    stream = TrackStream("foo", "bar", ["foo"], url="http://aswefdsews.com/")
+    stream = TrackStream(auth.BasicAuthHandler("foo", "bar"), ["foo"], url="http://aswefdsews.com/")
     assert_raises(ConnectionError, stream.next)
 
 
@@ -114,7 +115,7 @@
     def do_test(klass, *args):
         with test_server(handler=tweetsource,
                          methods=("post", "get"), port="random") as server:
-            stream = klass("foo", "bar", *args, url=server.baseurl)
+            stream = klass(auth.BasicAuthHandler("foo", "bar"), *args, url=server.baseurl)
             for tweet in stream:
                 if stream.count == total:
                     break
@@ -145,7 +146,7 @@
     def do_test(klass, *args):
         with test_server(handler=tweetsource, methods=("post", "get"),
                          port="random") as server:
-            stream = klass("foo", "bar", *args, url=server.baseurl)
+            stream = klass(auth.BasicAuthHandler("foo", "bar"), *args, url=server.baseurl)
             try:
                 for tweet in stream:
                     pass
@@ -178,7 +179,7 @@
     def do_test(klass, *args):
         with test_server(handler=tweetsource, methods=("post", "get"),
                          port="random") as server:
-            stream = klass("foo", "bar", *args, url=server.baseurl)
+            stream = klass(auth.BasicAuthHandler("foo", "bar"), *args, url=server.baseurl)
 
             start = time.time()
             stream.next()
--- a/script/lib/tweetstream/tweetstream/__init__.py	Tue Jan 18 18:25:18 2011 +0100
+++ b/script/lib/tweetstream/tweetstream/__init__.py	Thu Jan 20 10:44:04 2011 +0100
@@ -6,11 +6,14 @@
 __homepage__ = "http://bitbucket.org/runeh/tweetstream/"
 __docformat__ = "restructuredtext"
 
+import anyjson
+import logging
+import socket
+import time
 import urllib
 import urllib2
-import socket
-import time
-import anyjson
+import urlparse
+socket._fileobject.default_bufsize = 0
 
 
 """
@@ -57,10 +60,9 @@
 class TweetStream(object):
     """A network connection to Twitters streaming API
 
-    :param username: Twitter username for the account accessing the API.
-    :param password: Twitter password for the account accessing the API.
+    :param auth: Twitter authentication (user name/password - oauth)
 
-    :keyword url: URL to connect to. This can be either an endopoint name,
+    :keyword url: URL to connect to. This can be either an endpoint name,
      such as "sample", or a full URL. By default, the public "sample" url
      is used. All known endpoints are defined in the :URLS: attribute
 
@@ -102,12 +104,11 @@
         :attr: `USER_AGENT`.
 """
 
-    def __init__(self, username, password, url="sample"):
+    def __init__(self, auth, url="sample"):
         self._conn = None
         self._rate_ts = None
         self._rate_cnt = 0
-        self._username = username
-        self._password = password
+        self._auth = auth
 
         self.rate_period = 10 # in seconds
         self.connected = False
@@ -130,16 +131,30 @@
     def _init_conn(self):
         """Open the connection to the twitter server"""
         headers = {'User-Agent': self.user_agent}
-        req = urllib2.Request(self.url, self._get_post_data(), headers)
+        params_str = self._get_post_data()
+        if params_str is not None:
+            method = "POST"
+            params = dict([(key, ",".join(value)) for key, value in urlparse.parse_qs(params_str).items()])
+        else:
+            method = "GET"
+            params = None
+        
+        if self._auth:
+            self._auth.apply_auth(self.url, method, headers, params)
+        
+        req = urllib2.Request(self.url, urllib.urlencode(params) , headers)
 
-        password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
-        password_mgr.add_password(None, self.url, self._username,
-                                  self._password)
-        handler = urllib2.HTTPBasicAuthHandler(password_mgr)
-        opener = urllib2.build_opener(handler)
+        #replace with auth
+        #password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
+        #password_mgr.add_password(None, self.url, self._username,
+        #                          self._password)
+        #handler = urllib2.HTTPBasicAuthHandler(password_mgr)
+        #handler = urllib2.HTTPHandler()
+        #opener = urllib2.build_opener(handler)
 
         try:
-            self._conn = opener.open(req)
+            #self._conn = opener.open(req)
+            self._conn = urllib2.urlopen(req)
         except urllib2.HTTPError, exception:
             if exception.code == 401:
                 raise AuthenticationError("Access denied")
@@ -149,7 +164,7 @@
                 raise
         except urllib2.URLError, exception:
             raise ConnectionError(exception.reason)
-
+        logging.debug("TweetStream._init_conn : connected : %s, params : %s, headers: %s " % (self.url, repr(params), repr(headers)))
         self.connected = True
         if not self.starttime:
             self.starttime = time.time()
@@ -166,6 +181,7 @@
         """Return the next available tweet. This call is blocking!"""
         while True:
             try:
+                data=None
                 if not self.connected:
                     self._init_conn()
 
@@ -176,6 +192,8 @@
                     self._rate_ts = time.time()
 
                 data = self._conn.readline()
+                logging.debug("TweetStream.next : data : %s" % repr(data))
+                
                 if data == "": # something is wrong
                     self.close()
                     raise ConnectionError("Got entry of length 0. Disconnected")
@@ -189,7 +207,7 @@
 
             except ValueError, e:
                 self.close()
-                raise ConnectionError("Got invalid data from twitter", details=data)
+                raise ConnectionError("Got invalid data from twitter " + str(e), details=data)
 
             except socket.error, e:
                 self.close()
@@ -210,9 +228,7 @@
     connecting goes down. Reconnecting, and waiting for reconnecting, is
     blocking.
 
-    :param username: See :TweetStream:
-
-    :param password: See :TweetStream:
+    :param auth: See :TweetStream:
 
     :keyword url: See :TweetStream:
 
@@ -227,13 +243,13 @@
 
     """
 
-    def __init__(self, username, password, url="sample",
+    def __init__(self, auth, url="sample",
                  reconnects=3, error_cb=None, retry_wait=5):
         self.max_reconnects = reconnects
         self.retry_wait = retry_wait
         self._reconnects = 0
         self._error_cb = error_cb
-        TweetStream.__init__(self, username, password, url=url)
+        TweetStream.__init__(self, auth, url=url)
 
     def next(self):
         while True:
@@ -256,9 +272,7 @@
 class FollowStream(TweetStream):
     """Stream class for getting tweets from followers.
 
-        :param user: See TweetStream
-
-        :param password: See TweetStream
+        :param auth: See TweetStream
 
         :param followees: Iterable containing user IDs to follow.
           ***Note:*** the user id in question is the numeric ID twitter uses,
@@ -268,9 +282,9 @@
           value is the "follow" endpoint.
     """
 
-    def __init__(self, user, password, followees, url="follow", **kwargs):
+    def __init__(self, auth, followees, url="follow", **kwargs):
         self.followees = followees
-        TweetStream.__init__(self, user, password, url=url, **kwargs)
+        TweetStream.__init__(self, auth, url=url, **kwargs)
 
     def _get_post_data(self):
         return urllib.urlencode({"follow": ",".join(map(str, self.followees))})
@@ -279,9 +293,7 @@
 class TrackStream(TweetStream):
     """Stream class for getting tweets relevant to keywords.
 
-        :param user: See TweetStream
-
-        :param password: See TweetStream
+        :param auth: See TweetStream
 
         :param keywords: Iterable containing keywords to look for
 
@@ -289,9 +301,9 @@
           value is the "track" endpoint.
     """
 
-    def __init__(self, user, password, keywords, url="track", **kwargs):
+    def __init__(self, auth, keywords, url="track", **kwargs):
         self.keywords = keywords
-        TweetStream.__init__(self, user, password, url=url, **kwargs)
+        TweetStream.__init__(self, auth, url=url, **kwargs)
 
     def _get_post_data(self):
         return urllib.urlencode({"track": ",".join(self.keywords)})
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/tweetstream/tweetstream/auth.py	Thu Jan 20 10:44:04 2011 +0100
@@ -0,0 +1,162 @@
+# Tweepy
+# Copyright 2009-2010 Joshua Roesslein
+# See LICENSE for details.
+
+from tweetstream import TweetStreamError
+from urllib2 import Request, urlopen
+import base64
+import tweetstream.oauth
+
+
+
+class AuthHandler(object):
+
+    def apply_auth(self, url, method, headers, parameters):
+        """Apply authentication headers to request"""
+        raise NotImplementedError
+
+    def get_username(self):
+        """Return the username of the authenticated user"""
+        raise NotImplementedError
+
+
+class BasicAuthHandler(AuthHandler):
+
+    def __init__(self, username, password):
+        self.username = username
+        self._b64up = base64.b64encode('%s:%s' % (username, password))
+
+    def apply_auth(self, url, method, headers, parameters):
+        headers['Authorization'] = 'Basic %s' % self._b64up
+
+    def get_username(self):
+        return self.username
+
+
+class OAuthHandler(AuthHandler):
+    """OAuth authentication handler"""
+
+    OAUTH_HOST = 'api.twitter.com'
+    OAUTH_ROOT = '/oauth/'
+
+    def __init__(self, consumer_key, consumer_secret, callback=None, secure=False):
+        self._consumer = tweetstream.oauth.OAuthConsumer(consumer_key, consumer_secret)
+        self._sigmethod = tweetstream.oauth.OAuthSignatureMethod_HMAC_SHA1()
+        self.request_token = None
+        self.access_token = None
+        self.callback = callback
+        self.username = None
+        self.secure = secure
+
+    def _get_oauth_url(self, endpoint, secure=False):
+        if self.secure or secure:
+            prefix = 'https://'
+        else:
+            prefix = 'http://'
+
+        return prefix + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint
+
+    def apply_auth(self, url, method, headers, parameters):
+        request = tweetstream.oauth.OAuthRequest.from_consumer_and_token(
+            self._consumer, http_url=url, http_method=method,
+            token=self.access_token, parameters=parameters
+        )
+        request.sign_request(self._sigmethod, self._consumer, self.access_token)
+        headers.update(request.to_header())
+
+    def _get_request_token(self):
+        try:
+            url = self._get_oauth_url('request_token')
+            request = tweetstream.oauth.OAuthRequest.from_consumer_and_token(
+                self._consumer, http_url=url, callback=self.callback
+            )
+            request.sign_request(self._sigmethod, self._consumer, None)
+            resp = urlopen(Request(url, headers=request.to_header()))
+            return tweetstream.oauth.OAuthToken.from_string(resp.read())
+        except Exception, e:
+            raise TweetStreamError(e)
+
+    def set_request_token(self, key, secret):
+        self.request_token = tweetstream.oauth.OAuthToken(key, secret)
+
+    def set_access_token(self, key, secret):
+        self.access_token = tweetstream.oauth.OAuthToken(key, secret)
+
+    def get_authorization_url(self, signin_with_twitter=False):
+        """Get the authorization URL to redirect the user"""
+        try:
+            # get the request token
+            self.request_token = self._get_request_token()
+
+            # build auth request and return as url
+            if signin_with_twitter:
+                url = self._get_oauth_url('authenticate')
+            else:
+                url = self._get_oauth_url('authorize')
+            request = tweetstream.oauth.OAuthRequest.from_token_and_callback(
+                token=self.request_token, http_url=url
+            )
+
+            return request.to_url()
+        except Exception, e:
+            raise TweetStreamError(e)
+
+    def get_access_token(self, verifier=None):
+        """
+        After user has authorized the request token, get access token
+        with user supplied verifier.
+        """
+        try:
+            url = self._get_oauth_url('access_token')
+
+            # build request
+            request = tweetstream.oauth.OAuthRequest.from_consumer_and_token(
+                self._consumer,
+                token=self.request_token, http_url=url,
+                verifier=str(verifier)
+            )
+            request.sign_request(self._sigmethod, self._consumer, self.request_token)
+
+            # send request
+            resp = urlopen(Request(url, headers=request.to_header()))
+            self.access_token = tweetstream.oauth.OAuthToken.from_string(resp.read())
+            return self.access_token
+        except Exception, e:
+            raise TweetStreamError(e)
+
+    def get_xauth_access_token(self, username, password):
+        """
+        Get an access token from an username and password combination.
+        In order to get this working you need to create an app at
+        http://twitter.com/apps, after that send a mail to api@twitter.com
+        and request activation of xAuth for it.
+        """
+        try:
+            url = self._get_oauth_url('access_token', secure=True) # must use HTTPS
+            request = tweetstream.oauth.OAuthRequest.from_consumer_and_token(
+                oauth_consumer=self._consumer,
+                http_method='POST', http_url=url,
+                parameters = {
+		            'x_auth_mode': 'client_auth',
+		            'x_auth_username': username,
+		            'x_auth_password': password
+                }
+            )
+            request.sign_request(self._sigmethod, self._consumer, None)
+
+            resp = urlopen(Request(url, data=request.to_postdata()))
+            self.access_token = tweetstream.oauth.OAuthToken.from_string(resp.read())
+            return self.access_token
+        except Exception, e:
+            raise TweetStreamError(e)
+
+    def get_username(self):
+        if self.username is None:
+            api = API(self)
+            user = api.verify_credentials()
+            if user:
+                self.username = user.screen_name
+            else:
+                raise TweetStreamError("Unable to get username, invalid oauth token!")
+        return self.username
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/script/lib/tweetstream/tweetstream/oauth.py	Thu Jan 20 10:44:04 2011 +0100
@@ -0,0 +1,655 @@
+"""
+The MIT License
+
+Copyright (c) 2007 Leah Culver
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+"""
+
+import binascii
+import cgi
+import hmac
+import random
+import time
+import urllib
+import urlparse
+
+
+VERSION = '1.0' # Hi Blaine!
+HTTP_METHOD = 'GET'
+SIGNATURE_METHOD = 'PLAINTEXT'
+
+
+class OAuthError(RuntimeError):
+    """Generic exception class."""
+    def __init__(self, message='OAuth error occured.'):
+        self.message = message
+
+def build_authenticate_header(realm=''):
+    """Optional WWW-Authenticate header (401 error)"""
+    return {'WWW-Authenticate': 'OAuth realm="%s"' % realm}
+
+def escape(s):
+    """Escape a URL including any /."""
+    return urllib.quote(s, safe='~')
+
+def _utf8_str(s):
+    """Convert unicode to utf-8."""
+    if isinstance(s, unicode):
+        return s.encode("utf-8")
+    else:
+        return str(s)
+
+def generate_timestamp():
+    """Get seconds since epoch (UTC)."""
+    return int(time.time())
+
+def generate_nonce(length=8):
+    """Generate pseudorandom number."""
+    return ''.join([str(random.randint(0, 9)) for i in range(length)])
+
+def generate_verifier(length=8):
+    """Generate pseudorandom number."""
+    return ''.join([str(random.randint(0, 9)) for i in range(length)])
+
+
+class OAuthConsumer(object):
+    """Consumer of OAuth authentication.
+
+    OAuthConsumer is a data type that represents the identity of the Consumer
+    via its shared secret with the Service Provider.
+
+    """
+    key = None
+    secret = None
+
+    def __init__(self, key, secret):
+        self.key = key
+        self.secret = secret
+
+
+class OAuthToken(object):
+    """OAuthToken is a data type that represents an End User via either an access
+    or request token.
+    
+    key -- the token
+    secret -- the token secret
+
+    """
+    key = None
+    secret = None
+    callback = None
+    callback_confirmed = None
+    verifier = None
+
+    def __init__(self, key, secret):
+        self.key = key
+        self.secret = secret
+
+    def set_callback(self, callback):
+        self.callback = callback
+        self.callback_confirmed = 'true'
+
+    def set_verifier(self, verifier=None):
+        if verifier is not None:
+            self.verifier = verifier
+        else:
+            self.verifier = generate_verifier()
+
+    def get_callback_url(self):
+        if self.callback and self.verifier:
+            # Append the oauth_verifier.
+            parts = urlparse.urlparse(self.callback)
+            scheme, netloc, path, params, query, fragment = parts[:6]
+            if query:
+                query = '%s&oauth_verifier=%s' % (query, self.verifier)
+            else:
+                query = 'oauth_verifier=%s' % self.verifier
+            return urlparse.urlunparse((scheme, netloc, path, params,
+                query, fragment))
+        return self.callback
+
+    def to_string(self):
+        data = {
+            'oauth_token': self.key,
+            'oauth_token_secret': self.secret,
+        }
+        if self.callback_confirmed is not None:
+            data['oauth_callback_confirmed'] = self.callback_confirmed
+        return urllib.urlencode(data)
+ 
+    def from_string(s):
+        """ Returns a token from something like:
+        oauth_token_secret=xxx&oauth_token=xxx
+        """
+        params = cgi.parse_qs(s, keep_blank_values=False)
+        key = params['oauth_token'][0]
+        secret = params['oauth_token_secret'][0]
+        token = OAuthToken(key, secret)
+        try:
+            token.callback_confirmed = params['oauth_callback_confirmed'][0]
+        except KeyError:
+            pass # 1.0, no callback confirmed.
+        return token
+    from_string = staticmethod(from_string)
+
+    def __str__(self):
+        return self.to_string()
+
+
+class OAuthRequest(object):
+    """OAuthRequest represents the request and can be serialized.
+
+    OAuth parameters:
+        - oauth_consumer_key 
+        - oauth_token
+        - oauth_signature_method
+        - oauth_signature 
+        - oauth_timestamp 
+        - oauth_nonce
+        - oauth_version
+        - oauth_verifier
+        ... any additional parameters, as defined by the Service Provider.
+    """
+    parameters = None # OAuth parameters.
+    http_method = HTTP_METHOD
+    http_url = None
+    version = VERSION
+
+    def __init__(self, http_method=HTTP_METHOD, http_url=None, parameters=None):
+        self.http_method = http_method
+        self.http_url = http_url
+        self.parameters = parameters or {}
+
+    def set_parameter(self, parameter, value):
+        self.parameters[parameter] = value
+
+    def get_parameter(self, parameter):
+        try:
+            return self.parameters[parameter]
+        except:
+            raise OAuthError('Parameter not found: %s' % parameter)
+
+    def _get_timestamp_nonce(self):
+        return self.get_parameter('oauth_timestamp'), self.get_parameter(
+            'oauth_nonce')
+
+    def get_nonoauth_parameters(self):
+        """Get any non-OAuth parameters."""
+        parameters = {}
+        for k, v in self.parameters.iteritems():
+            # Ignore oauth parameters.
+            if k.find('oauth_') < 0:
+                parameters[k] = v
+        return parameters
+
+    def to_header(self, realm=''):
+        """Serialize as a header for an HTTPAuth request."""
+        auth_header = 'OAuth realm="%s"' % realm
+        # Add the oauth parameters.
+        if self.parameters:
+            for k, v in self.parameters.iteritems():
+                if k[:6] == 'oauth_':
+                    auth_header += ', %s="%s"' % (k, escape(str(v)))
+        return {'Authorization': auth_header}
+
+    def to_postdata(self):
+        """Serialize as post data for a POST request."""
+        return '&'.join(['%s=%s' % (escape(str(k)), escape(str(v))) \
+            for k, v in self.parameters.iteritems()])
+
+    def to_url(self):
+        """Serialize as a URL for a GET request."""
+        return '%s?%s' % (self.get_normalized_http_url(), self.to_postdata())
+
+    def get_normalized_parameters(self):
+        """Return a string that contains the parameters that must be signed."""
+        params = self.parameters
+        try:
+            # Exclude the signature if it exists.
+            del params['oauth_signature']
+        except:
+            pass
+        # Escape key values before sorting.
+        key_values = [(escape(_utf8_str(k)), escape(_utf8_str(v))) \
+            for k,v in params.items()]
+        # Sort lexicographically, first after key, then after value.
+        key_values.sort()
+        # Combine key value pairs into a string.
+        return '&'.join(['%s=%s' % (k, v) for k, v in key_values])
+
+    def get_normalized_http_method(self):
+        """Uppercases the http method."""
+        return self.http_method.upper()
+
+    def get_normalized_http_url(self):
+        """Parses the URL and rebuilds it to be scheme://host/path."""
+        parts = urlparse.urlparse(self.http_url)
+        scheme, netloc, path = parts[:3]
+        # Exclude default port numbers.
+        if scheme == 'http' and netloc[-3:] == ':80':
+            netloc = netloc[:-3]
+        elif scheme == 'https' and netloc[-4:] == ':443':
+            netloc = netloc[:-4]
+        return '%s://%s%s' % (scheme, netloc, path)
+
+    def sign_request(self, signature_method, consumer, token):
+        """Set the signature parameter to the result of build_signature."""
+        # Set the signature method.
+        self.set_parameter('oauth_signature_method',
+            signature_method.get_name())
+        # Set the signature.
+        self.set_parameter('oauth_signature',
+            self.build_signature(signature_method, consumer, token))
+
+    def build_signature(self, signature_method, consumer, token):
+        """Calls the build signature method within the signature method."""
+        return signature_method.build_signature(self, consumer, token)
+
+    def from_request(http_method, http_url, headers=None, parameters=None,
+            query_string=None):
+        """Combines multiple parameter sources."""
+        if parameters is None:
+            parameters = {}
+
+        # Headers
+        if headers and 'Authorization' in headers:
+            auth_header = headers['Authorization']
+            # Check that the authorization header is OAuth.
+            if auth_header[:6] == 'OAuth ':
+                auth_header = auth_header[6:]
+                try:
+                    # Get the parameters from the header.
+                    header_params = OAuthRequest._split_header(auth_header)
+                    parameters.update(header_params)
+                except:
+                    raise OAuthError('Unable to parse OAuth parameters from '
+                        'Authorization header.')
+
+        # GET or POST query string.
+        if query_string:
+            query_params = OAuthRequest._split_url_string(query_string)
+            parameters.update(query_params)
+
+        # URL parameters.
+        param_str = urlparse.urlparse(http_url)[4] # query
+        url_params = OAuthRequest._split_url_string(param_str)
+        parameters.update(url_params)
+
+        if parameters:
+            return OAuthRequest(http_method, http_url, parameters)
+
+        return None
+    from_request = staticmethod(from_request)
+
+    def from_consumer_and_token(oauth_consumer, token=None,
+            callback=None, verifier=None, http_method=HTTP_METHOD,
+            http_url=None, parameters=None):
+        if not parameters:
+            parameters = {}
+
+        defaults = {
+            'oauth_consumer_key': oauth_consumer.key,
+            'oauth_timestamp': generate_timestamp(),
+            'oauth_nonce': generate_nonce(),
+            'oauth_version': OAuthRequest.version,
+        }
+
+        defaults.update(parameters)
+        parameters = defaults
+
+        if token:
+            parameters['oauth_token'] = token.key
+            if token.callback:
+                parameters['oauth_callback'] = token.callback
+            # 1.0a support for verifier.
+            if verifier:
+                parameters['oauth_verifier'] = verifier
+        elif callback:
+            # 1.0a support for callback in the request token request.
+            parameters['oauth_callback'] = callback
+
+        return OAuthRequest(http_method, http_url, parameters)
+    from_consumer_and_token = staticmethod(from_consumer_and_token)
+
+    def from_token_and_callback(token, callback=None, http_method=HTTP_METHOD,
+            http_url=None, parameters=None):
+        if not parameters:
+            parameters = {}
+
+        parameters['oauth_token'] = token.key
+
+        if callback:
+            parameters['oauth_callback'] = callback
+
+        return OAuthRequest(http_method, http_url, parameters)
+    from_token_and_callback = staticmethod(from_token_and_callback)
+
+    def _split_header(header):
+        """Turn Authorization: header into parameters."""
+        params = {}
+        parts = header.split(',')
+        for param in parts:
+            # Ignore realm parameter.
+            if param.find('realm') > -1:
+                continue
+            # Remove whitespace.
+            param = param.strip()
+            # Split key-value.
+            param_parts = param.split('=', 1)
+            # Remove quotes and unescape the value.
+            params[param_parts[0]] = urllib.unquote(param_parts[1].strip('\"'))
+        return params
+    _split_header = staticmethod(_split_header)
+
+    def _split_url_string(param_str):
+        """Turn URL string into parameters."""
+        parameters = cgi.parse_qs(param_str, keep_blank_values=False)
+        for k, v in parameters.iteritems():
+            parameters[k] = urllib.unquote(v[0])
+        return parameters
+    _split_url_string = staticmethod(_split_url_string)
+
+class OAuthServer(object):
+    """A worker to check the validity of a request against a data store."""
+    timestamp_threshold = 300 # In seconds, five minutes.
+    version = VERSION
+    signature_methods = None
+    data_store = None
+
+    def __init__(self, data_store=None, signature_methods=None):
+        self.data_store = data_store
+        self.signature_methods = signature_methods or {}
+
+    def set_data_store(self, data_store):
+        self.data_store = data_store
+
+    def get_data_store(self):
+        return self.data_store
+
+    def add_signature_method(self, signature_method):
+        self.signature_methods[signature_method.get_name()] = signature_method
+        return self.signature_methods
+
+    def fetch_request_token(self, oauth_request):
+        """Processes a request_token request and returns the
+        request token on success.
+        """
+        try:
+            # Get the request token for authorization.
+            token = self._get_token(oauth_request, 'request')
+        except OAuthError:
+            # No token required for the initial token request.
+            version = self._get_version(oauth_request)
+            consumer = self._get_consumer(oauth_request)
+            try:
+                callback = self.get_callback(oauth_request)
+            except OAuthError:
+                callback = None # 1.0, no callback specified.
+            self._check_signature(oauth_request, consumer, None)
+            # Fetch a new token.
+            token = self.data_store.fetch_request_token(consumer, callback)
+        return token
+
+    def fetch_access_token(self, oauth_request):
+        """Processes an access_token request and returns the
+        access token on success.
+        """
+        version = self._get_version(oauth_request)
+        consumer = self._get_consumer(oauth_request)
+        try:
+            verifier = self._get_verifier(oauth_request)
+        except OAuthError:
+            verifier = None
+        # Get the request token.
+        token = self._get_token(oauth_request, 'request')
+        self._check_signature(oauth_request, consumer, token)
+        new_token = self.data_store.fetch_access_token(consumer, token, verifier)
+        return new_token
+
+    def verify_request(self, oauth_request):
+        """Verifies an api call and checks all the parameters."""
+        # -> consumer and token
+        version = self._get_version(oauth_request)
+        consumer = self._get_consumer(oauth_request)
+        # Get the access token.
+        token = self._get_token(oauth_request, 'access')
+        self._check_signature(oauth_request, consumer, token)
+        parameters = oauth_request.get_nonoauth_parameters()
+        return consumer, token, parameters
+
+    def authorize_token(self, token, user):
+        """Authorize a request token."""
+        return self.data_store.authorize_request_token(token, user)
+
+    def get_callback(self, oauth_request):
+        """Get the callback URL."""
+        return oauth_request.get_parameter('oauth_callback')
+ 
+    def build_authenticate_header(self, realm=''):
+        """Optional support for the authenticate header."""
+        return {'WWW-Authenticate': 'OAuth realm="%s"' % realm}
+
+    def _get_version(self, oauth_request):
+        """Verify the correct version request for this server."""
+        try:
+            version = oauth_request.get_parameter('oauth_version')
+        except:
+            version = VERSION
+        if version and version != self.version:
+            raise OAuthError('OAuth version %s not supported.' % str(version))
+        return version
+
+    def _get_signature_method(self, oauth_request):
+        """Figure out the signature with some defaults."""
+        try:
+            signature_method = oauth_request.get_parameter(
+                'oauth_signature_method')
+        except:
+            signature_method = SIGNATURE_METHOD
+        try:
+            # Get the signature method object.
+            signature_method = self.signature_methods[signature_method]
+        except:
+            signature_method_names = ', '.join(self.signature_methods.keys())
+            raise OAuthError('Signature method %s not supported try one of the '
+                'following: %s' % (signature_method, signature_method_names))
+
+        return signature_method
+
+    def _get_consumer(self, oauth_request):
+        consumer_key = oauth_request.get_parameter('oauth_consumer_key')
+        consumer = self.data_store.lookup_consumer(consumer_key)
+        if not consumer:
+            raise OAuthError('Invalid consumer.')
+        return consumer
+
+    def _get_token(self, oauth_request, token_type='access'):
+        """Try to find the token for the provided request token key."""
+        token_field = oauth_request.get_parameter('oauth_token')
+        token = self.data_store.lookup_token(token_type, token_field)
+        if not token:
+            raise OAuthError('Invalid %s token: %s' % (token_type, token_field))
+        return token
+    
+    def _get_verifier(self, oauth_request):
+        return oauth_request.get_parameter('oauth_verifier')
+
+    def _check_signature(self, oauth_request, consumer, token):
+        timestamp, nonce = oauth_request._get_timestamp_nonce()
+        self._check_timestamp(timestamp)
+        self._check_nonce(consumer, token, nonce)
+        signature_method = self._get_signature_method(oauth_request)
+        try:
+            signature = oauth_request.get_parameter('oauth_signature')
+        except:
+            raise OAuthError('Missing signature.')
+        # Validate the signature.
+        valid_sig = signature_method.check_signature(oauth_request, consumer,
+            token, signature)
+        if not valid_sig:
+            key, base = signature_method.build_signature_base_string(
+                oauth_request, consumer, token)
+            raise OAuthError('Invalid signature. Expected signature base '
+                'string: %s' % base)
+        built = signature_method.build_signature(oauth_request, consumer, token)
+
+    def _check_timestamp(self, timestamp):
+        """Verify that timestamp is recentish."""
+        timestamp = int(timestamp)
+        now = int(time.time())
+        lapsed = abs(now - timestamp)
+        if lapsed > self.timestamp_threshold:
+            raise OAuthError('Expired timestamp: given %d and now %s has a '
+                'greater difference than threshold %d' %
+                (timestamp, now, self.timestamp_threshold))
+
+    def _check_nonce(self, consumer, token, nonce):
+        """Verify that the nonce is uniqueish."""
+        nonce = self.data_store.lookup_nonce(consumer, token, nonce)
+        if nonce:
+            raise OAuthError('Nonce already used: %s' % str(nonce))
+
+
+class OAuthClient(object):
+    """OAuthClient is a worker to attempt to execute a request."""
+    consumer = None
+    token = None
+
+    def __init__(self, oauth_consumer, oauth_token):
+        self.consumer = oauth_consumer
+        self.token = oauth_token
+
+    def get_consumer(self):
+        return self.consumer
+
+    def get_token(self):
+        return self.token
+
+    def fetch_request_token(self, oauth_request):
+        """-> OAuthToken."""
+        raise NotImplementedError
+
+    def fetch_access_token(self, oauth_request):
+        """-> OAuthToken."""
+        raise NotImplementedError
+
+    def access_resource(self, oauth_request):
+        """-> Some protected resource."""
+        raise NotImplementedError
+
+
+class OAuthDataStore(object):
+    """A database abstraction used to lookup consumers and tokens."""
+
+    def lookup_consumer(self, key):
+        """-> OAuthConsumer."""
+        raise NotImplementedError
+
+    def lookup_token(self, oauth_consumer, token_type, token_token):
+        """-> OAuthToken."""
+        raise NotImplementedError
+
+    def lookup_nonce(self, oauth_consumer, oauth_token, nonce):
+        """-> OAuthToken."""
+        raise NotImplementedError
+
+    def fetch_request_token(self, oauth_consumer, oauth_callback):
+        """-> OAuthToken."""
+        raise NotImplementedError
+
+    def fetch_access_token(self, oauth_consumer, oauth_token, oauth_verifier):
+        """-> OAuthToken."""
+        raise NotImplementedError
+
+    def authorize_request_token(self, oauth_token, user):
+        """-> OAuthToken."""
+        raise NotImplementedError
+
+
+class OAuthSignatureMethod(object):
+    """A strategy class that implements a signature method."""
+    def get_name(self):
+        """-> str."""
+        raise NotImplementedError
+
+    def build_signature_base_string(self, oauth_request, oauth_consumer, oauth_token):
+        """-> str key, str raw."""
+        raise NotImplementedError
+
+    def build_signature(self, oauth_request, oauth_consumer, oauth_token):
+        """-> str."""
+        raise NotImplementedError
+
+    def check_signature(self, oauth_request, consumer, token, signature):
+        built = self.build_signature(oauth_request, consumer, token)
+        return built == signature
+
+
+class OAuthSignatureMethod_HMAC_SHA1(OAuthSignatureMethod):
+
+    def get_name(self):
+        return 'HMAC-SHA1'
+        
+    def build_signature_base_string(self, oauth_request, consumer, token):
+        sig = (
+            escape(oauth_request.get_normalized_http_method()),
+            escape(oauth_request.get_normalized_http_url()),
+            escape(oauth_request.get_normalized_parameters()),
+        )
+
+        key = '%s&' % escape(consumer.secret)
+        if token:
+            key += escape(token.secret)
+        raw = '&'.join(sig)
+        return key, raw
+
+    def build_signature(self, oauth_request, consumer, token):
+        """Builds the base signature string."""
+        key, raw = self.build_signature_base_string(oauth_request, consumer,
+            token)
+
+        # HMAC object.
+        try:
+            import hashlib # 2.5
+            hashed = hmac.new(key, raw, hashlib.sha1)
+        except:
+            import sha # Deprecated
+            hashed = hmac.new(key, raw, sha)
+
+        # Calculate the digest base 64.
+        return binascii.b2a_base64(hashed.digest())[:-1]
+
+
+class OAuthSignatureMethod_PLAINTEXT(OAuthSignatureMethod):
+
+    def get_name(self):
+        return 'PLAINTEXT'
+
+    def build_signature_base_string(self, oauth_request, consumer, token):
+        """Concatenates the consumer key and secret."""
+        sig = '%s&' % escape(consumer.secret)
+        if token:
+            sig = sig + escape(token.secret)
+        return sig, sig
+
+    def build_signature(self, oauth_request, consumer, token):
+        key, raw = self.build_signature_base_string(oauth_request, consumer,
+            token)
+        return key
\ No newline at end of file
--- a/script/rest/search_enmi.py	Tue Jan 18 18:25:18 2011 +0100
+++ b/script/rest/search_enmi.py	Thu Jan 20 10:44:04 2011 +0100
@@ -18,7 +18,10 @@
     parser.add_option("-Q", dest="query",
                       help="query", metavar="QUERY")
     parser.add_option("-P", dest="rpp", metavar="RPP", default="50",
-                      help="Result per page", metavar="RPP")
+                      help="Result per page")
+    parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
+                      help="Token file name")
+    
 
     #add request token
     #add 
@@ -46,7 +49,7 @@
             results = twitter. search(q=options.query, rpp=options.rpp, page=page)
             for tweet in results["results"]:
                 print tweet
-                processor = utils.TwitterProcessor(tweet, None, session)
+                processor = utils.TwitterProcessor(tweet, None, session, options.token_filename)
                 processor.process()
                 session.flush()
             page += 1
--- a/script/stream/recorder_tweetstream.py	Tue Jan 18 18:25:18 2011 +0100
+++ b/script/stream/recorder_tweetstream.py	Thu Jan 20 10:44:04 2011 +0100
@@ -8,6 +8,7 @@
 import socket
 import sys
 import tweetstream
+import tweetstream.auth
 socket._fileobject.default_bufsize = 0
 
 
@@ -41,21 +42,22 @@
 
     """
 
-    def __init__(self, user, password, keywords, url="track", reconnects=3, error_cb=None, retry_wait=5, **kwargs):
+    def __init__(self, auth, keywords, url="track", reconnects=3, error_cb=None, retry_wait=5, **kwargs):
         self.max_reconnects = reconnects
         self.retry_wait = retry_wait
         self._reconnects = 0
         self._error_cb = error_cb
-        super(ReconnectingTweetStream,self).__init__(user, password, keywords, url, **kwargs)
+        super(ReconnectingTweetStream,self).__init__(auth, keywords, url, **kwargs)
 
     def next(self):
         while True:
             try:
                 return super(ReconnectingTweetStream,self).next()
             except tweetstream.ConnectionError, e:
+                logging.debug("connection error :" + str(e))
                 self._reconnects += 1
                 if self._reconnects > self.max_reconnects:
-                    raise ConnectionError("Too many retries")
+                    raise tweetstream.ConnectionError("Too many retries")
 
                 # Note: error_cb is not called on the last error since we
                 # raise a ConnectionError instead
@@ -68,24 +70,32 @@
 
 
 
-def process_tweet(tweet, session, debug):
+def process_tweet(tweet, session, debug, token_filename):
     
     logging.debug("Process_tweet :" + repr(tweet))
-    processor = utils.TwitterProcessor(tweet, None, session)
+    processor = utils.TwitterProcessor(tweet, None, session, token_filename)
     processor.process()
 
-def main(username, password, track, session, debug, reconnects):
+def main(username, password, track, session, debug, reconnects, token_filename):
 
-    username = username or raw_input('Twitter username: ')
-    password = password or getpass('Twitter password: ')
+    #username = username or raw_input('Twitter username: ')
+    #password = password or getpass('Twitter password: ')
 
     track_list = track or raw_input('Keywords to track (comma seperated): ').strip()
     track_list = [k for k in track_list.split(',')]
-
-    stream = ReconnectingTweetStream(username, password, track_list, reconnects=reconnects)
+    
+    if username and password:
+        auth = tweetstream.auth.BasicAuthHandler(username, password)        
+    else:
+        consumer_key = models.CONSUMER_KEY
+        consumer_secret = models.CONSUMER_SECRET
+        auth = tweetstream.auth.OAuthHandler(consumer_key, consumer_secret, secure=False)
+        auth.set_access_token(*(utils.get_oauth_token(token_filename)))
+    
+    stream = ReconnectingTweetStream(auth, track_list, reconnects=reconnects)
     try:
         for tweet in stream:
-            process_tweet(tweet, session, debug)
+            process_tweet(tweet, session, debug, token_filename)
             session.commit()
     finally:
         stream.close()
@@ -98,13 +108,15 @@
                       help="Twitter user", metavar="USER", default=None)
     parser.add_option("-w", "--password", dest="password",
                       help="Twitter password", metavar="PASSWORD", default=None)
-    parser.add_option("-t", "--track", dest="track",
+    parser.add_option("-T", "--track", dest="track",
                       help="Twitter track", metavar="TRACK")
     parser.add_option("-n", "--new", dest="new", action="store_true",
                       help="new database", default=False)
     parser.add_option("-r", "--reconnects", dest="reconnects",
                       help="Reconnects", metavar="RECONNECTS", default=10, type='int')
-    
+    parser.add_option("-t", dest="token_filename", metavar="TOKEN_FILENAME", default=".oauth_token",
+                      help="Token file name")
+
     utils.set_logging_options(parser)
 
     return parser.parse_args()
@@ -130,7 +142,7 @@
 
     try:
         try:
-            main(options.username, options.password, options.track, session, options.debug, options.reconnects)
+            main(options.username, options.password, options.track, session, options.debug, options.reconnects, options.token_filename)
         except KeyboardInterrupt:
             print '\nGoodbye!'
         session.commit()