script/lib/tweetstream/tweetstream/auth.py
author Yves-Marie Haussonne <1218002+ymph@users.noreply.github.com>
Wed, 16 Mar 2011 11:02:10 +0100
changeset 74 5b67d2d5ca37
parent 15 5d552b6a0e55
permissions -rw-r--r--
Merge with bc9ecc2a336ab97b122598603e3d038d1ac30d91

# Tweepy
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.

from tweetstream import TweetStreamError
from urllib2 import Request, urlopen
import base64
import tweetstream.oauth



class AuthHandler(object):

    def apply_auth(self, url, method, headers, parameters):
        """Apply authentication headers to request"""
        raise NotImplementedError

    def get_username(self):
        """Return the username of the authenticated user"""
        raise NotImplementedError


class BasicAuthHandler(AuthHandler):

    def __init__(self, username, password):
        self.username = username
        self._b64up = base64.b64encode('%s:%s' % (username, password))

    def apply_auth(self, url, method, headers, parameters):
        headers['Authorization'] = 'Basic %s' % self._b64up

    def get_username(self):
        return self.username


class OAuthHandler(AuthHandler):
    """OAuth authentication handler"""

    OAUTH_HOST = 'api.twitter.com'
    OAUTH_ROOT = '/oauth/'

    def __init__(self, consumer_key, consumer_secret, callback=None, secure=False):
        self._consumer = tweetstream.oauth.OAuthConsumer(consumer_key, consumer_secret)
        self._sigmethod = tweetstream.oauth.OAuthSignatureMethod_HMAC_SHA1()
        self.request_token = None
        self.access_token = None
        self.callback = callback
        self.username = None
        self.secure = secure

    def _get_oauth_url(self, endpoint, secure=False):
        if self.secure or secure:
            prefix = 'https://'
        else:
            prefix = 'http://'

        return prefix + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint

    def apply_auth(self, url, method, headers, parameters):
        request = tweetstream.oauth.OAuthRequest.from_consumer_and_token(
            self._consumer, http_url=url, http_method=method,
            token=self.access_token, parameters=parameters
        )
        request.sign_request(self._sigmethod, self._consumer, self.access_token)
        headers.update(request.to_header())

    def _get_request_token(self):
        try:
            url = self._get_oauth_url('request_token')
            request = tweetstream.oauth.OAuthRequest.from_consumer_and_token(
                self._consumer, http_url=url, callback=self.callback
            )
            request.sign_request(self._sigmethod, self._consumer, None)
            resp = urlopen(Request(url, headers=request.to_header()))
            return tweetstream.oauth.OAuthToken.from_string(resp.read())
        except Exception, e:
            raise TweetStreamError(e)

    def set_request_token(self, key, secret):
        self.request_token = tweetstream.oauth.OAuthToken(key, secret)

    def set_access_token(self, key, secret):
        self.access_token = tweetstream.oauth.OAuthToken(key, secret)

    def get_authorization_url(self, signin_with_twitter=False):
        """Get the authorization URL to redirect the user"""
        try:
            # get the request token
            self.request_token = self._get_request_token()

            # build auth request and return as url
            if signin_with_twitter:
                url = self._get_oauth_url('authenticate')
            else:
                url = self._get_oauth_url('authorize')
            request = tweetstream.oauth.OAuthRequest.from_token_and_callback(
                token=self.request_token, http_url=url
            )

            return request.to_url()
        except Exception, e:
            raise TweetStreamError(e)

    def get_access_token(self, verifier=None):
        """
        After user has authorized the request token, get access token
        with user supplied verifier.
        """
        try:
            url = self._get_oauth_url('access_token')

            # build request
            request = tweetstream.oauth.OAuthRequest.from_consumer_and_token(
                self._consumer,
                token=self.request_token, http_url=url,
                verifier=str(verifier)
            )
            request.sign_request(self._sigmethod, self._consumer, self.request_token)

            # send request
            resp = urlopen(Request(url, headers=request.to_header()))
            self.access_token = tweetstream.oauth.OAuthToken.from_string(resp.read())
            return self.access_token
        except Exception, e:
            raise TweetStreamError(e)

    def get_xauth_access_token(self, username, password):
        """
        Get an access token from an username and password combination.
        In order to get this working you need to create an app at
        http://twitter.com/apps, after that send a mail to api@twitter.com
        and request activation of xAuth for it.
        """
        try:
            url = self._get_oauth_url('access_token', secure=True) # must use HTTPS
            request = tweetstream.oauth.OAuthRequest.from_consumer_and_token(
                oauth_consumer=self._consumer,
                http_method='POST', http_url=url,
                parameters = {
		            'x_auth_mode': 'client_auth',
		            'x_auth_username': username,
		            'x_auth_password': password
                }
            )
            request.sign_request(self._sigmethod, self._consumer, None)

            resp = urlopen(Request(url, data=request.to_postdata()))
            self.access_token = tweetstream.oauth.OAuthToken.from_string(resp.read())
            return self.access_token
        except Exception, e:
            raise TweetStreamError(e)

    def get_username(self):
        if self.username is None:
            api = API(self)
            user = api.verify_credentials()
            if user:
                self.username = user.screen_name
            else:
                raise TweetStreamError("Unable to get username, invalid oauth token!")
        return self.username