diff -r 20c41a7e2173 -r 83b28fc0d731 web/ldt/test/client.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/web/ldt/test/client.py Mon Dec 13 23:55:19 2010 +0100 @@ -0,0 +1,629 @@ +#from django.test.client import Client as DClient +from django.conf import settings +from django.core.urlresolvers import reverse +from django.http import HttpResponse, SimpleCookie +from django.test.client import encode_multipart, encode_file, Client, BOUNDARY, \ + MULTIPART_CONTENT, CONTENT_TYPE_RE +from django.utils.encoding import smart_str +from django.utils.http import urlencode +from ldt.utils import Property +from oauth2 import Request, Consumer, Token, SignatureMethod_HMAC_SHA1, \ + generate_nonce, SignatureMethod_PLAINTEXT +from oauth_provider.consts import OUT_OF_BAND +from urlparse import urlsplit, urlunsplit, urlparse, urlunparse, parse_qs +import httplib2 +import logging +import re +try: + from cStringIO import StringIO +except ImportError: + from StringIO import StringIO + + +class WebClient(object): + """ + A class that can act as a client for testing purposes. + + It allows the user to compose GET and POST requests, and + obtain the response that the server gave to those requests. + The server Response objects are annotated with the details + of the contexts and templates that were rendered during the + process of serving the request. + + Client objects are stateful - they will retain cookie (and + thus session) details for the lifetime of the Client instance. + + This is not intended as a replacement for Twill/Selenium or + the like - it is here to allow testing against the + contexts and templates produced by a view, rather than the + HTML rendered to the end-user. + """ + def __init__(self, **defaults): + self.handler = httplib2.Http() + #self.defaults = defaults + self.cookies = SimpleCookie() + #self.exc_info = None + #self.errors = StringIO() + self.__baseurltuple = () + self.__login_url = None + + @Property + def baseurl(): + + def fget(self): + return self.__baseurltuple + + def fset(self, value): + if isinstance(value, tuple): + self.__baseurltuple = value + else: + self.__baseurltuple = urlsplit(unicode(value)) + + return locals() + + @Property + def login_url(): + + def fget(self): + return self.__login_url + + def fset(self, value): + self.__login_url = value + + return locals() + + + def _mergeurl(self, urltuple): + res = ["" for i in range(5)] + for i in range(min(len(self.baseurl), len(urltuple))): + res[i] = self.baseurl[i] or urltuple[i] + + return urlunsplit(res) + + def _process_response(self, response, content): + resp = HttpResponse(content=content, status=response.status, content_type=response['content-type']) + if 'set-cookie' in response: + self.cookies.load(response['set-cookie']) + resp.cookies.load(response['set-cookie']) + + resp.client = self + resp.raw_response = response + for key,value in response.items(): + resp[key] = value + + return resp + + def _handle_redirects(self, response): + + response.redirect_chain = [] + + r = response.raw_response.previous + while not r is None: + response.redirect_chain.append((r['content-location'],r.status)) + r = r.previous + + return response + + + def get(self, path, data={}, follow=False, **extra): + """ + Requests a response from the server using GET. + """ + parsed = list(urlsplit(path)) + parsed[3] = urlencode(data, doseq=True) or parsed[3] + + + fullpath = self._mergeurl(parsed) + self.handler.follow_redirects = follow + + headers = {} + if len(self.cookies) > 0: + headers['Cookie'] = self.cookies.output() + + if extra: + headers.update(extra) + + response, content = self.handler.request(fullpath, method="GET", headers=headers) + + resp = self._process_response(response, content) + + if follow: + resp = self._handle_redirects(resp) + return resp + + + def post(self, path, data={}, content_type="application/x-www-form-urlencoded", + follow=False, **extra): + """ + Requests a response from the server using POST. + """ + if content_type == MULTIPART_CONTENT: + post_data = encode_multipart(BOUNDARY, data) + elif content_type == "application/x-www-form-urlencoded": + post_data = urlencode(data) + else: + # Encode the content so that the byte representation is correct. + match = CONTENT_TYPE_RE.match(content_type) + if match: + charset = match.group(1) + else: + charset = settings.DEFAULT_CHARSET + post_data = smart_str(data, encoding=charset) + + parsed = list(urlsplit(path)) + fullpath = self._mergeurl(parsed) + self.handler.follow_redirects = follow + + headers = {} + headers['Content-type'] = content_type + if len(self.cookies) > 0: + headers['Cookie'] = self.cookies.output() + + if extra: + headers.update(extra) + + response,content = self.handler.request(fullpath, method="POST", headers=headers, body=post_data) + + resp = self._process_response(response, content) + + if follow: + resp = self._handle_redirects(response) + return resp + + def login(self, **credentials): + """ + Sets the Client to appear as if it has successfully logged into a site. + + Returns True if login is possible; False if the provided credentials + are incorrect, or the user is inactive, or if the sessions framework is + not available. + """ + resp = self.post(path=self.login_url, data=credentials, follow=False, **{"X-Requested-With" : "XMLHttpRequest"}) + return resp.status_code == 302 + +# +# def head(self, path, data={}, follow=False, **extra): +# """ +# Request a response from the server using HEAD. +# """ +# parsed = urlparse(path) +# r = { +# 'CONTENT_TYPE': 'text/html; charset=utf-8', +# 'PATH_INFO': urllib.unquote(parsed[2]), +# 'QUERY_STRING': urlencode(data, doseq=True) or parsed[4], +# 'REQUEST_METHOD': 'HEAD', +# 'wsgi.input': FakePayload('') +# } +# r.update(extra) +# +# response = self.request(**r) +# if follow: +# response = self._handle_redirects(response) +# return response +# +# def options(self, path, data={}, follow=False, **extra): +# """ +# Request a response from the server using OPTIONS. +# """ +# parsed = urlparse(path) +# +# r = { +# 'PATH_INFO': urllib.unquote(parsed[2]), +# 'QUERY_STRING': urlencode(data, doseq=True) or parsed[4], +# 'REQUEST_METHOD': 'OPTIONS', +# 'wsgi.input': FakePayload('') +# } +# r.update(extra) +# +# response = self.request(**r) +# if follow: +# response = self._handle_redirects(response) +# return response +# +# def put(self, path, data={}, content_type=MULTIPART_CONTENT, +# follow=False, **extra): +# """ +# Send a resource to the server using PUT. +# """ +# if content_type is MULTIPART_CONTENT: +# post_data = encode_multipart(BOUNDARY, data) +# else: +# post_data = data +# +# # Make `data` into a querystring only if it's not already a string. If +# # it is a string, we'll assume that the caller has already encoded it. +# query_string = None +# if not isinstance(data, basestring): +# query_string = urlencode(data, doseq=True) +# +# parsed = urlparse(path) +# r = { +# 'CONTENT_LENGTH': len(post_data), +# 'CONTENT_TYPE': content_type, +# 'PATH_INFO': urllib.unquote(parsed[2]), +# 'QUERY_STRING': query_string or parsed[4], +# 'REQUEST_METHOD': 'PUT', +# 'wsgi.input': FakePayload(post_data), +# } +# r.update(extra) +# +# response = self.request(**r) +# if follow: +# response = self._handle_redirects(response) +# return response +# +# def delete(self, path, data={}, follow=False, **extra): +# """ +# Send a DELETE request to the server. +# """ +# parsed = urlparse(path) +# r = { +# 'PATH_INFO': urllib.unquote(parsed[2]), +# 'QUERY_STRING': urlencode(data, doseq=True) or parsed[4], +# 'REQUEST_METHOD': 'DELETE', +# 'wsgi.input': FakePayload('') +# } +# r.update(extra) +# +# response = self.request(**r) +# if follow: +# response = self._handle_redirects(response) +# return response +# +# def login(self, **credentials): +# """ +# Sets the Client to appear as if it has successfully logged into a site. +# +# Returns True if login is possible; False if the provided credentials +# are incorrect, or the user is inactive, or if the sessions framework is +# not available. +# """ +# user = authenticate(**credentials) +# if user and user.is_active \ +# and 'django.contrib.sessions' in settings.INSTALLED_APPS: +# engine = import_module(settings.SESSION_ENGINE) +# +# # Create a fake request to store login details. +# request = HttpRequest() +# if self.session: +# request.session = self.session +# else: +# request.session = engine.SessionStore() +# login(request, user) +# +# # Save the session values. +# request.session.save() +# +# # Set the cookie to represent the session. +# session_cookie = settings.SESSION_COOKIE_NAME +# self.cookies[session_cookie] = request.session.session_key +# cookie_data = { +# 'max-age': None, +# 'path': '/', +# 'domain': settings.SESSION_COOKIE_DOMAIN, +# 'secure': settings.SESSION_COOKIE_SECURE or None, +# 'expires': None, +# } +# self.cookies[session_cookie].update(cookie_data) +# +# return True +# else: +# return False +# +# def logout(self): +# """ +# Removes the authenticated user's cookies and session object. +# +# Causes the authenticated user to be logged out. +# """ +# session = import_module(settings.SESSION_ENGINE).SessionStore() +# session_cookie = self.cookies.get(settings.SESSION_COOKIE_NAME) +# if session_cookie: +# session.delete(session_key=session_cookie.value) +# self.cookies = SimpleCookie() + + +class OAuthPayload(object): + + def __init__(self, servername="testserver"): + self._token = None + self._servername = servername + self._oauth_parameters = { + 'oauth_version': '1.0' + } + self._oauth_parameters_extra = { + 'oauth_callback': 'http://127.0.0.1/callback', + 'scope':'all' + } + self.errors = StringIO() + + def _get_signed_request(self, method, path, params): + + parameters = params.copy() + parameters.update(self._oauth_parameters) + oauth_request = Request.from_consumer_and_token(consumer=self._consumer, token=self._token, http_method=method, http_url=path, parameters=parameters) + oauth_request.sign_request(SignatureMethod_HMAC_SHA1(), consumer=self._consumer, token=self._token) + + return oauth_request + + + def set_consumer(self, key, secret): + self._consumer = Consumer(key, secret) + self._oauth_parameters['oauth_consumer_key'] = key + + def set_scope(self, value): + self._oauth_parameters_extra['scope'] = value + + def inject_oauth_data(self, path, method, data): + + path_parsed = urlparse(path) + + if method=='GET' and (data is None or len(data) == 0): + new_data = parse_qs(path_parsed[4]) + elif data is None: + new_data = {} + else: + new_data = data.copy() + + clean_path = ['']*6 + clean_path[0] = 'http' + clean_path[1] = self._servername + for i in range(0,4): + clean_path[i] = path_parsed[i] or clean_path[i] + path = urlunparse(clean_path) + + oauth_request = self._get_signed_request(method, path, new_data) + + new_data.update(oauth_request) + + return new_data + + def login(self, client, login_method, **credential): + + + self._oauth_parameters.update(self._oauth_parameters_extra) + #Obtaining a Request Token + resp = client.get(reverse('oauth_request_token'), follow=True) + if resp.status_code == 200: + self._token = Token.from_string(resp.content) + else: + self.errors.write("oauth_request_token response status code fail : " + repr(resp)) + return False + + #Requesting User Authorization + res = login_method(client, **credential) + if not res: + self.errors.write("login failed : " + repr(credential)) + return False + + resp = client.get(reverse('oauth_user_authorization')) + if resp.status_code != 200: + self.errors.write("oauth_user_authorization get response status code fail : " + repr(resp)) + return False + + #"X-Requested-With" : "XMLHttpRequest" + resp = client.post(reverse('oauth_user_authorization'), {'authorize_access':1}, **{"X-Requested-With" : "XMLHttpRequest"}) + if resp.status_code != 302: + self.errors.write("oauth_user_authorization post response status code fail : " + repr(resp)) + return False + + location_splitted = urlsplit(resp["Location"]) + location_query_dict = parse_qs(location_splitted[3]) + self._token.verifier = location_query_dict['oauth_verifier'] + + + #Obtaining an Access Token + resp = client.get(reverse('oauth_access_token')) + if resp.status_code == 200: + self._token = Token.from_string(resp.content) + for key in self._oauth_parameters_extra.keys(): + if key in self._oauth_parameters: + del(self._oauth_parameters[key]) + return True + else: + self.errors.write("oauth_access_token get response status code fail : " + repr(resp)) + return False + + def logout(self): + self._token = None + +METHOD_MAPPING = { + 'get' : 'GET', + 'post' : 'POST', + 'put' : 'POST', + 'head' : 'GET', + 'options' : 'GET', + 'delete' : 'GET' +} + +def _generate_request_wrapper(meth): + def request_wrapper(inst, *args, **kwargs): + path = args[0] if len(args) > 0 else kwargs.get('path','') + data = args[1] if len(args) > 1 else kwargs.get('data',{}) + args = args[2:] + if 'path' in kwargs: + del(kwargs['path']) + if 'data' in kwargs: + del(kwargs['data']) + data = inst._oauth_data.inject_oauth_data(path, METHOD_MAPPING[meth.__name__], data) + return meth(inst,path=path, data=data, *args, **kwargs) + return request_wrapper + +def _generate_login_wrapper(meth): + def login_wrapper(inst, **credential): + return inst._oauth_data.login(inst, meth, **credential) + return login_wrapper + +def _generate_logout_wrapper(meth): + def logout_wrapper(inst): + inst._oauth_data.logout() + meth(inst) + return logout_wrapper + +class OAuthMetaclass(type): + + def __new__(cls, name, bases, attrs): + newattrs = {} + def set_consumer(inst, key, secret): + inst._oauth_data.set_consumer(key,secret) + newattrs['set_consumer'] = set_consumer + def set_scope(inst, scope): + inst._oauth_data.set_scope(scope) + newattrs['set_scope'] = set_scope + + for attrname, attrvalue in attrs.iteritems(): + if attrname in ('get', 'post', 'head', 'options', 'put', 'delete'): + newattrs[attrname] = _generate_request_wrapper(attrvalue) + elif attrname == 'login': + newattrs[attrname] = _generate_login_wrapper(attrvalue) + elif attrname == 'logout': + newattrs[attrname] = _generate_logout_wrapper(attrvalue) + else: + newattrs[attrname] = attrvalue + + for klass in bases: + for attrname, attrvalue in klass.__dict__.iteritems(): + if attrname in newattrs: + continue + if attrname in ('get', 'post', 'head', 'options', 'put', 'delete'): + newattrs[attrname] = _generate_request_wrapper(attrvalue) + elif attrname == 'login': + newattrs[attrname] = _generate_login_wrapper(attrvalue) + elif attrname == 'logout': + newattrs[attrname] = _generate_logout_wrapper(attrvalue) + + init_method = newattrs.get("__init__", None) + + def new_init(inst, *args, **kwargs): + inst._oauth_data = OAuthPayload(attrs.get('servername','testserver')) + if init_method is not None: + init_method(*args,**kwargs) + else: + super(inst.__class__,inst).__init__(*args,**kwargs) + newattrs["__init__"] = new_init + + return super(OAuthMetaclass, cls).__new__(cls, name, bases, newattrs) + + + +class OAuthClient(Client): + __metaclass__ = OAuthMetaclass +# def __init__(self, **default): +# super(OAuthClient,self).__init__(**default) +# self.__token = None +# self.oauth_parameters = { +# 'oauth_version': '1.0', +# 'oauth_callback': 'http://127.0.0.1/callback', +# 'scope':'all' +# } +# +# def __get_signed_request(self, method, path): +# +# oauth_request = Request.from_consumer_and_token(consumer=self.__consumer, token=self.__token, http_method=method, http_url=path, parameters=self.oauth_parameters) +# oauth_request.sign_request(SignatureMethod_HMAC_SHA1(), consumer=self.__consumer, token=self.__token) +# +# return oauth_request +# +# +# def set_consumer(self, key, secret): +# self.__consumer = Consumer(key, secret) +# self.oauth_parameters['oauth_consumer_key'] = key +# +# def set_scope(self, value): +# self.oauth_parameters['scope'] = value +# +# def __inject_oauth_data(self, path, method, data): +# +# path_parsed = urlparse(path) +# +# if method=='GET' and len(data) == 0: +# data= parse_qs(path_parsed[4]) +# +# clean_path = ['']*6 +# clean_path[0] = 'http' +# clean_path[1] = 'testserver' +# for i in range(0,4): +# clean_path[i] = path_parsed[i] or clean_path[i] +# path = urlunparse(clean_path) +# +# oauth_request = self.__get_signed_request(method, path) +# +# data.update(oauth_request) +# +# +# def get(self, path, data={}, follow=False, **extra): +# +# self.__inject_oauth_data(path, 'GET', data) +# return super(OAuthClient, self).get(path, data, follow, **extra) +# +# +# def post(self, path, data={}, content_type=MULTIPART_CONTENT, +# follow=False, **extra): +# self.__inject_oauth_data(path, 'POST', data) +# return super(OAuthClient,self).post(path, data, content_type, follow, **extra) +# +# def head(self, path, data={}, follow=False, **extra): +# self.__inject_oauth_data(path, 'GET', data) +# return super(OAuthClient, self).head(path, data, follow, **extra) +# +# def options(self, path, data={}, follow=False, **extra): +# self.__inject_oauth_data(path, 'GET', data) +# return options(OAuthClient, self).options(path, data, follow, **extra) +# +# def put(self, path, data={}, content_type=MULTIPART_CONTENT, +# follow=False, **extra): +# self.__inject_oauth_data(path, 'POST', data) +# return super(OAuthClient,self).put(path, data, content_type, follow, **extra) +# +# def delete(self, path, data={}, follow=False, **extra): +# self.__inject_oauth_data(path, 'GET', data) +# return super(OAuthClient, self).delete(path, data, follow, **extra) +# +# ### TODO: better document errors +# def login(self, **credential): +# +# #Obtaining a Request Token +# resp = self.get(reverse('oauth_request_token'), follow=True) +# if resp.status_code == 200: +# self.__token = Token.from_string(resp.content) +# else: +# self.errors.write("oauth_request_token response status code fail : " + repr(resp)) +# return False +# +# #Requesting User Authorization +# res = super(OAuthClient, self).login(**credential) +# if not res: +# self.errors.write("login failed : " + repr(credential)) +# return False +# +# resp = self.get(reverse('oauth_user_authorization')) +# if resp.status_code != 200: +# self.errors.write("oauth_user_authorization get response status code fail : " + repr(resp)) +# return False +# +# resp = self.post(reverse('oauth_user_authorization'), {'authorize_access':1}) +# if resp.status_code != 302: +# self.errors.write("oauth_user_authorization post response status code fail : " + repr(resp)) +# return False +# +# location_splitted = urlsplit(resp["Location"]) +# location_query_dict = parse_qs(location_splitted[3]) +# self.__token.verifier = location_query_dict['oauth_verifier'] +# +# +# #Obtaining an Access Token +# resp = self.get(reverse('oauth_access_token')) +# if resp.status_code == 200: +# self.__token = Token.from_string(resp.content) +# return True +# else: +# self.errors.write("oauth_access_token get response status code fail : " + repr(resp)) +# return False +# +# def logout(self): +# super(OAuthClient,self).logout() +# self._token = None + +class OAuthWebClient(WebClient): + __metaclass__ = OAuthMetaclass + servername = '127.0.0.1:8000'