# HG changeset patch # User ymh # Date 1292280919 -3600 # Node ID 83b28fc0d731ed74f0054787efe44e08f3624d0f # Parent 20c41a7e2173aa1a80bcfdd94382a3f5ea0cae87 improve on ldt test framework start migration for text test diff -r 20c41a7e2173 -r 83b28fc0d731 .settings/org.eclipse.core.resources.prefs --- a/.settings/org.eclipse.core.resources.prefs Fri Nov 19 18:14:02 2010 +0100 +++ b/.settings/org.eclipse.core.resources.prefs Mon Dec 13 23:55:19 2010 +0100 @@ -1,4 +1,3 @@ -#Tue Nov 16 12:57:53 CET 2010 -eclipse.preferences.version=1 -encoding//web/ldt/ldt_utils/tests.py=UTF-8 -encoding//web/ldt/text/tests.py=UTF-8 +#Mon Dec 13 23:54:00 CET 2010 +eclipse.preferences.version=1 +encoding//web/ldt/ldt_utils/tests.py=UTF-8 diff -r 20c41a7e2173 -r 83b28fc0d731 .settings/org.eclipse.ltk.core.refactoring.prefs --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/.settings/org.eclipse.ltk.core.refactoring.prefs Mon Dec 13 23:55:19 2010 +0100 @@ -0,0 +1,3 @@ +#Mon Dec 06 14:41:14 CET 2010 +eclipse.preferences.version=1 +org.eclipse.ltk.core.refactoring.enable.project.refactoring.history=false diff -r 20c41a7e2173 -r 83b28fc0d731 sbin/virtualenv/create_python_env.py --- a/sbin/virtualenv/create_python_env.py Fri Nov 19 18:14:02 2010 +0100 +++ b/sbin/virtualenv/create_python_env.py Mon Dec 13 23:55:19 2010 +0100 @@ -49,10 +49,14 @@ EXTRA_TEXT += " 'DJANGO-EXTENSIONS' : { 'setup': 'django-extensions', 'url':'http://django-command-extensions.googlecode.com/files/django-extensions-0.4.1.tar.gz', 'local':'"+ os.path.abspath(os.path.join(src_base,"django-extensions-0.4.1.tar.gz"))+"' },\n" EXTRA_TEXT += " 'DJANGO-REGISTRATION' : { 'setup': 'django-registration', 'url':'http://bitbucket.org/ubernostrum/django-registration/get/tip.tar.gz', 'local':'"+ os.path.abspath(os.path.join(src_base,"django-registration.tar.gz"))+"' },\n" EXTRA_TEXT += " 'DJANGO-TAGGING' : { 'setup': 'django-tagging', 'url':'http://django-tagging.googlecode.com/files/django-tagging-0.3.1.tar.gz', 'local':'"+ os.path.abspath(os.path.join(src_base,"django-tagging-0.3.1.tar.gz"))+"' },\n" -EXTRA_TEXT += " 'DJANGO-OAUTH' : { 'setup': 'django-oauth', 'url':'http://code.welldev.org/django-oauth/get/549a34c81394.gz', 'local':'"+ os.path.abspath(os.path.join(src_base,"django-oauth.gz"))+"' },\n" +EXTRA_TEXT += " 'SETUPTOOLS-HG' : { 'setup': 'setuptools-hg', 'url':'http://bitbucket.org/jezdez/setuptools_hg/downloads/setuptools_hg-0.2.tar.gz', 'local':'"+ os.path.abspath(os.path.join(src_base,"setuptools_hg-0.2.tar.gz"))+"' },\n" +EXTRA_TEXT += " 'OAUTH2' : { 'setup': 'python-oauth2', 'url':'https://github.com/simplegeo/python-oauth2/tarball/1.2.1', 'local':'"+ os.path.abspath(os.path.join(src_base,"python-oauth2-1.2.1.tar.gz"))+"' },\n" +EXTRA_TEXT += " 'HTTPLIB2' : { 'setup': 'python-oauth2', 'url':'http://httplib2.googlecode.com/files/httplib2-0.6.0.tar.gz', 'local':'"+ os.path.abspath(os.path.join(src_base,"httplib2-0.6.0.tar.gz"))+"' },\n" +EXTRA_TEXT += " 'DJANGO-OAUTH-PLUS' : { 'setup': 'django-oauth-plus', 'url':'http://bitbucket.org/david/django-oauth-plus/get/f314f018e473.gz', 'local':'"+ os.path.abspath(os.path.join(src_base,"django-oauth-plus.gz"))+"' },\n" EXTRA_TEXT += " 'LXML' : { 'setup': 'lxml', 'url': '"+ os.path.abspath(os.path.join(src_base,"lxml_2.2.8.tar.gz"))+"', 'local': '"+ os.path.abspath(os.path.join(src_base,"lxml-2.2.8.tar.gz"))+"'},\n" EXTRA_TEXT += "}\n" + EXTRA_TEXT += "import sys\n" EXTRA_TEXT += "sys.path.append('"+lib_path+"')\n" @@ -212,7 +216,9 @@ ('DJANGO-EXTENSIONS', 'pip', None, None), ('DJANGO-REGISTRATION', 'easy_install', '-Z', None), ('DJANGO-TAGGING', 'pip', None, None), - ('DJANGO-OAUTH', 'pip', None, None), + ('HTTPLIB2', 'pip', None, None), + ('OAUTH2', 'pip', None, None), + ('DJANGO-OAUTH-PLUS', 'pip', None, None), ] if system_str == "Darwin": diff -r 20c41a7e2173 -r 83b28fc0d731 sbin/virtualenv/res/src/django-oauth-plus.gz Binary file sbin/virtualenv/res/src/django-oauth-plus.gz has changed diff -r 20c41a7e2173 -r 83b28fc0d731 sbin/virtualenv/res/src/django-oauth-plus.tar.gz Binary file sbin/virtualenv/res/src/django-oauth-plus.tar.gz has changed diff -r 20c41a7e2173 -r 83b28fc0d731 sbin/virtualenv/res/src/django-oauth.gz Binary file sbin/virtualenv/res/src/django-oauth.gz has changed diff -r 20c41a7e2173 -r 83b28fc0d731 sbin/virtualenv/res/src/django-oauth.tar.gz Binary file sbin/virtualenv/res/src/django-oauth.tar.gz has changed diff -r 20c41a7e2173 -r 83b28fc0d731 sbin/virtualenv/res/src/httplib2-0.6.0.tar.gz Binary file sbin/virtualenv/res/src/httplib2-0.6.0.tar.gz has changed diff -r 20c41a7e2173 -r 83b28fc0d731 sbin/virtualenv/res/src/python-oauth2-1.2.1.tar.gz Binary file sbin/virtualenv/res/src/python-oauth2-1.2.1.tar.gz has changed diff -r 20c41a7e2173 -r 83b28fc0d731 sbin/virtualenv/res/src/setuptools_git-0.3.4.tar.gz.gz Binary file sbin/virtualenv/res/src/setuptools_git-0.3.4.tar.gz.gz has changed diff -r 20c41a7e2173 -r 83b28fc0d731 sbin/virtualenv/res/src/setuptools_hg-0.2.tar.gz Binary file sbin/virtualenv/res/src/setuptools_hg-0.2.tar.gz has changed diff -r 20c41a7e2173 -r 83b28fc0d731 web/ldt/ldt_utils/contentindexer.py --- a/web/ldt/ldt_utils/contentindexer.py Fri Nov 19 18:14:02 2010 +0100 +++ b/web/ldt/ldt_utils/contentindexer.py Mon Dec 13 23:55:19 2010 +0100 @@ -4,7 +4,6 @@ import shutil from ldt.utils import zipfileext import urllib -# import ldt.utils.log import ldt.utils.xml from django.conf import settings from models import Content @@ -155,7 +154,6 @@ def index_project(self, project): - # pocketfilms.utils.log.debug("Indexing project : "+str(project.iri_id)) doc = lxml.etree.fromstring(project.ldt) self.__writer.deleteDocuments(lucene.Term("iri_id", project.iri_id)) @@ -168,7 +166,6 @@ ensembleId = "ens_perso" for decoupageNode in content.getchildren(): - # pocketfilms.utils.log.debug("Indexing content decoupage : "+ repr(decoupageNode.nodeType) + " in " + repr(self.decoupage_blacklist)) if decoupageNode.tag != "decoupage" or decoupageNode.get(None,"id") in self.decoupage_blacklist: continue diff -r 20c41a7e2173 -r 83b28fc0d731 web/ldt/ldt_utils/projectindexer.py --- a/web/ldt/ldt_utils/projectindexer.py Fri Nov 19 18:14:02 2010 +0100 +++ b/web/ldt/ldt_utils/projectindexer.py Mon Dec 13 23:55:19 2010 +0100 @@ -40,7 +40,6 @@ self.index_project(project) def index_project(self, project): - # ldt.utils.log.debug("Indexing project : "+str(project.ldt_id)) ldt=project.ldt doc = lxml.etree.fromstring(ldt.encode( "utf-8" )) diff -r 20c41a7e2173 -r 83b28fc0d731 web/ldt/management/commands/__init__.py diff -r 20c41a7e2173 -r 83b28fc0d731 web/ldt/management/commands/testrunserver.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/web/ldt/management/commands/testrunserver.py Mon Dec 13 23:55:19 2010 +0100 @@ -0,0 +1,79 @@ +from django.core.management.commands.runserver import Command as RunserverCommand +from django.core.servers.basehttp import WSGIServer, WSGIRequestHandler +import sys +import threading +from threading import Event +from django.conf import settings + +def run(addr, port, wsgi_handler, keep_running=None, ready_event=None): + server_address = (addr, port) + httpd = WSGIServer(server_address, WSGIRequestHandler) + httpd.set_app(wsgi_handler) + if keep_running is None: + if ready_event is not None: + ready_event.set() + httpd.serve_forever() + else: + if ready_event is not None: + ready_event.set() + while keep_running(): + httpd.handle_request() + + + +class Command(RunserverCommand): + def handle(self, addrport='', keep_running=None, ready_event=None, *args, **options): + import django + from django.core.servers.basehttp import AdminMediaHandler, WSGIServerException + from django.core.handlers.wsgi import WSGIHandler + if args: + raise CommandError('Usage is testrunserver %s' % self.args) + if not addrport: + addr = '' + port = '8000' + else: + try: + addr, port = addrport.split(':') + except ValueError: + addr, port = '', addrport + if not addr: + addr = '127.0.0.1' + + if not port.isdigit(): + raise CommandError("%r is not a valid port number." % port) + + admin_media_path = options.get('admin_media_path', '') + shutdown_message = options.get('shutdown_message', '') + quit_command = (sys.platform == 'win32') and 'CTRL-BREAK' or 'CONTROL-C' + + from django.conf import settings + from django.utils import translation + print "Validating models..." + self.validate(display_num_errors=True) + print "\nDjango version %s, using settings %r" % (django.get_version(), settings.SETTINGS_MODULE) + print "Development server is running at http://%s:%s/" % (addr, port) + print "Quit the server with %s." % quit_command + + translation.activate(settings.LANGUAGE_CODE) + + try: + handler = AdminMediaHandler(WSGIHandler(), admin_media_path) + run(addr, int(port), handler, keep_running, ready_event) + except WSGIServerException, e: + # Use helpful error messages instead of ugly tracebacks. + ERRORS = { + 13: "You don't have permission to access that port.", + 98: "That port is already in use.", + 99: "That IP address can't be assigned-to.", + } + try: + error_text = ERRORS[e.args[0].args[0]] + except (AttributeError, KeyError): + error_text = str(e) + sys.stderr.write(self.style.ERROR("Error: %s" % error_text) + '\n') + # Need to use an OS exit because sys.exit doesn't work in a thread + except KeyboardInterrupt: + if shutdown_message: + print shutdown_message + + \ No newline at end of file diff -r 20c41a7e2173 -r 83b28fc0d731 web/ldt/settings.py --- a/web/ldt/settings.py Fri Nov 19 18:14:02 2010 +0100 +++ b/web/ldt/settings.py Mon Dec 13 23:55:19 2010 +0100 @@ -1,7 +1,7 @@ +from django.conf import settings +import logging import os import os.path -import logging -from django.conf import settings # EMAIL_HOST='smtp.gmail.com' @@ -71,6 +71,8 @@ LOG_LEVEL = getattr(settings, 'LOG_LEVEL', logging.INFO) EMPTY_MEDIA_EXTERNALID = getattr(settings, 'EMPTY_MEDIA_EXTERNALID', None) +TEST_WEBSERVER_ADDRPORT = getattr(settings,'TEST_WEBSERVER_ADDRPORT', "127.0.0.1:8000") + GLOBAL_LOG_LEVEL = LOG_LEVEL GLOBAL_LOG_HANDLERS = [logging.FileHandler(LOG_FILE)] diff -r 20c41a7e2173 -r 83b28fc0d731 web/ldt/test/__init__.py diff -r 20c41a7e2173 -r 83b28fc0d731 web/ldt/test/client.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/web/ldt/test/client.py Mon Dec 13 23:55:19 2010 +0100 @@ -0,0 +1,629 @@ +#from django.test.client import Client as DClient +from django.conf import settings +from django.core.urlresolvers import reverse +from django.http import HttpResponse, SimpleCookie +from django.test.client import encode_multipart, encode_file, Client, BOUNDARY, \ + MULTIPART_CONTENT, CONTENT_TYPE_RE +from django.utils.encoding import smart_str +from django.utils.http import urlencode +from ldt.utils import Property +from oauth2 import Request, Consumer, Token, SignatureMethod_HMAC_SHA1, \ + generate_nonce, SignatureMethod_PLAINTEXT +from oauth_provider.consts import OUT_OF_BAND +from urlparse import urlsplit, urlunsplit, urlparse, urlunparse, parse_qs +import httplib2 +import logging +import re +try: + from cStringIO import StringIO +except ImportError: + from StringIO import StringIO + + +class WebClient(object): + """ + A class that can act as a client for testing purposes. + + It allows the user to compose GET and POST requests, and + obtain the response that the server gave to those requests. + The server Response objects are annotated with the details + of the contexts and templates that were rendered during the + process of serving the request. + + Client objects are stateful - they will retain cookie (and + thus session) details for the lifetime of the Client instance. + + This is not intended as a replacement for Twill/Selenium or + the like - it is here to allow testing against the + contexts and templates produced by a view, rather than the + HTML rendered to the end-user. + """ + def __init__(self, **defaults): + self.handler = httplib2.Http() + #self.defaults = defaults + self.cookies = SimpleCookie() + #self.exc_info = None + #self.errors = StringIO() + self.__baseurltuple = () + self.__login_url = None + + @Property + def baseurl(): + + def fget(self): + return self.__baseurltuple + + def fset(self, value): + if isinstance(value, tuple): + self.__baseurltuple = value + else: + self.__baseurltuple = urlsplit(unicode(value)) + + return locals() + + @Property + def login_url(): + + def fget(self): + return self.__login_url + + def fset(self, value): + self.__login_url = value + + return locals() + + + def _mergeurl(self, urltuple): + res = ["" for i in range(5)] + for i in range(min(len(self.baseurl), len(urltuple))): + res[i] = self.baseurl[i] or urltuple[i] + + return urlunsplit(res) + + def _process_response(self, response, content): + resp = HttpResponse(content=content, status=response.status, content_type=response['content-type']) + if 'set-cookie' in response: + self.cookies.load(response['set-cookie']) + resp.cookies.load(response['set-cookie']) + + resp.client = self + resp.raw_response = response + for key,value in response.items(): + resp[key] = value + + return resp + + def _handle_redirects(self, response): + + response.redirect_chain = [] + + r = response.raw_response.previous + while not r is None: + response.redirect_chain.append((r['content-location'],r.status)) + r = r.previous + + return response + + + def get(self, path, data={}, follow=False, **extra): + """ + Requests a response from the server using GET. + """ + parsed = list(urlsplit(path)) + parsed[3] = urlencode(data, doseq=True) or parsed[3] + + + fullpath = self._mergeurl(parsed) + self.handler.follow_redirects = follow + + headers = {} + if len(self.cookies) > 0: + headers['Cookie'] = self.cookies.output() + + if extra: + headers.update(extra) + + response, content = self.handler.request(fullpath, method="GET", headers=headers) + + resp = self._process_response(response, content) + + if follow: + resp = self._handle_redirects(resp) + return resp + + + def post(self, path, data={}, content_type="application/x-www-form-urlencoded", + follow=False, **extra): + """ + Requests a response from the server using POST. + """ + if content_type == MULTIPART_CONTENT: + post_data = encode_multipart(BOUNDARY, data) + elif content_type == "application/x-www-form-urlencoded": + post_data = urlencode(data) + else: + # Encode the content so that the byte representation is correct. + match = CONTENT_TYPE_RE.match(content_type) + if match: + charset = match.group(1) + else: + charset = settings.DEFAULT_CHARSET + post_data = smart_str(data, encoding=charset) + + parsed = list(urlsplit(path)) + fullpath = self._mergeurl(parsed) + self.handler.follow_redirects = follow + + headers = {} + headers['Content-type'] = content_type + if len(self.cookies) > 0: + headers['Cookie'] = self.cookies.output() + + if extra: + headers.update(extra) + + response,content = self.handler.request(fullpath, method="POST", headers=headers, body=post_data) + + resp = self._process_response(response, content) + + if follow: + resp = self._handle_redirects(response) + return resp + + def login(self, **credentials): + """ + Sets the Client to appear as if it has successfully logged into a site. + + Returns True if login is possible; False if the provided credentials + are incorrect, or the user is inactive, or if the sessions framework is + not available. + """ + resp = self.post(path=self.login_url, data=credentials, follow=False, **{"X-Requested-With" : "XMLHttpRequest"}) + return resp.status_code == 302 + +# +# def head(self, path, data={}, follow=False, **extra): +# """ +# Request a response from the server using HEAD. +# """ +# parsed = urlparse(path) +# r = { +# 'CONTENT_TYPE': 'text/html; charset=utf-8', +# 'PATH_INFO': urllib.unquote(parsed[2]), +# 'QUERY_STRING': urlencode(data, doseq=True) or parsed[4], +# 'REQUEST_METHOD': 'HEAD', +# 'wsgi.input': FakePayload('') +# } +# r.update(extra) +# +# response = self.request(**r) +# if follow: +# response = self._handle_redirects(response) +# return response +# +# def options(self, path, data={}, follow=False, **extra): +# """ +# Request a response from the server using OPTIONS. +# """ +# parsed = urlparse(path) +# +# r = { +# 'PATH_INFO': urllib.unquote(parsed[2]), +# 'QUERY_STRING': urlencode(data, doseq=True) or parsed[4], +# 'REQUEST_METHOD': 'OPTIONS', +# 'wsgi.input': FakePayload('') +# } +# r.update(extra) +# +# response = self.request(**r) +# if follow: +# response = self._handle_redirects(response) +# return response +# +# def put(self, path, data={}, content_type=MULTIPART_CONTENT, +# follow=False, **extra): +# """ +# Send a resource to the server using PUT. +# """ +# if content_type is MULTIPART_CONTENT: +# post_data = encode_multipart(BOUNDARY, data) +# else: +# post_data = data +# +# # Make `data` into a querystring only if it's not already a string. If +# # it is a string, we'll assume that the caller has already encoded it. +# query_string = None +# if not isinstance(data, basestring): +# query_string = urlencode(data, doseq=True) +# +# parsed = urlparse(path) +# r = { +# 'CONTENT_LENGTH': len(post_data), +# 'CONTENT_TYPE': content_type, +# 'PATH_INFO': urllib.unquote(parsed[2]), +# 'QUERY_STRING': query_string or parsed[4], +# 'REQUEST_METHOD': 'PUT', +# 'wsgi.input': FakePayload(post_data), +# } +# r.update(extra) +# +# response = self.request(**r) +# if follow: +# response = self._handle_redirects(response) +# return response +# +# def delete(self, path, data={}, follow=False, **extra): +# """ +# Send a DELETE request to the server. +# """ +# parsed = urlparse(path) +# r = { +# 'PATH_INFO': urllib.unquote(parsed[2]), +# 'QUERY_STRING': urlencode(data, doseq=True) or parsed[4], +# 'REQUEST_METHOD': 'DELETE', +# 'wsgi.input': FakePayload('') +# } +# r.update(extra) +# +# response = self.request(**r) +# if follow: +# response = self._handle_redirects(response) +# return response +# +# def login(self, **credentials): +# """ +# Sets the Client to appear as if it has successfully logged into a site. +# +# Returns True if login is possible; False if the provided credentials +# are incorrect, or the user is inactive, or if the sessions framework is +# not available. +# """ +# user = authenticate(**credentials) +# if user and user.is_active \ +# and 'django.contrib.sessions' in settings.INSTALLED_APPS: +# engine = import_module(settings.SESSION_ENGINE) +# +# # Create a fake request to store login details. +# request = HttpRequest() +# if self.session: +# request.session = self.session +# else: +# request.session = engine.SessionStore() +# login(request, user) +# +# # Save the session values. +# request.session.save() +# +# # Set the cookie to represent the session. +# session_cookie = settings.SESSION_COOKIE_NAME +# self.cookies[session_cookie] = request.session.session_key +# cookie_data = { +# 'max-age': None, +# 'path': '/', +# 'domain': settings.SESSION_COOKIE_DOMAIN, +# 'secure': settings.SESSION_COOKIE_SECURE or None, +# 'expires': None, +# } +# self.cookies[session_cookie].update(cookie_data) +# +# return True +# else: +# return False +# +# def logout(self): +# """ +# Removes the authenticated user's cookies and session object. +# +# Causes the authenticated user to be logged out. +# """ +# session = import_module(settings.SESSION_ENGINE).SessionStore() +# session_cookie = self.cookies.get(settings.SESSION_COOKIE_NAME) +# if session_cookie: +# session.delete(session_key=session_cookie.value) +# self.cookies = SimpleCookie() + + +class OAuthPayload(object): + + def __init__(self, servername="testserver"): + self._token = None + self._servername = servername + self._oauth_parameters = { + 'oauth_version': '1.0' + } + self._oauth_parameters_extra = { + 'oauth_callback': 'http://127.0.0.1/callback', + 'scope':'all' + } + self.errors = StringIO() + + def _get_signed_request(self, method, path, params): + + parameters = params.copy() + parameters.update(self._oauth_parameters) + oauth_request = Request.from_consumer_and_token(consumer=self._consumer, token=self._token, http_method=method, http_url=path, parameters=parameters) + oauth_request.sign_request(SignatureMethod_HMAC_SHA1(), consumer=self._consumer, token=self._token) + + return oauth_request + + + def set_consumer(self, key, secret): + self._consumer = Consumer(key, secret) + self._oauth_parameters['oauth_consumer_key'] = key + + def set_scope(self, value): + self._oauth_parameters_extra['scope'] = value + + def inject_oauth_data(self, path, method, data): + + path_parsed = urlparse(path) + + if method=='GET' and (data is None or len(data) == 0): + new_data = parse_qs(path_parsed[4]) + elif data is None: + new_data = {} + else: + new_data = data.copy() + + clean_path = ['']*6 + clean_path[0] = 'http' + clean_path[1] = self._servername + for i in range(0,4): + clean_path[i] = path_parsed[i] or clean_path[i] + path = urlunparse(clean_path) + + oauth_request = self._get_signed_request(method, path, new_data) + + new_data.update(oauth_request) + + return new_data + + def login(self, client, login_method, **credential): + + + self._oauth_parameters.update(self._oauth_parameters_extra) + #Obtaining a Request Token + resp = client.get(reverse('oauth_request_token'), follow=True) + if resp.status_code == 200: + self._token = Token.from_string(resp.content) + else: + self.errors.write("oauth_request_token response status code fail : " + repr(resp)) + return False + + #Requesting User Authorization + res = login_method(client, **credential) + if not res: + self.errors.write("login failed : " + repr(credential)) + return False + + resp = client.get(reverse('oauth_user_authorization')) + if resp.status_code != 200: + self.errors.write("oauth_user_authorization get response status code fail : " + repr(resp)) + return False + + #"X-Requested-With" : "XMLHttpRequest" + resp = client.post(reverse('oauth_user_authorization'), {'authorize_access':1}, **{"X-Requested-With" : "XMLHttpRequest"}) + if resp.status_code != 302: + self.errors.write("oauth_user_authorization post response status code fail : " + repr(resp)) + return False + + location_splitted = urlsplit(resp["Location"]) + location_query_dict = parse_qs(location_splitted[3]) + self._token.verifier = location_query_dict['oauth_verifier'] + + + #Obtaining an Access Token + resp = client.get(reverse('oauth_access_token')) + if resp.status_code == 200: + self._token = Token.from_string(resp.content) + for key in self._oauth_parameters_extra.keys(): + if key in self._oauth_parameters: + del(self._oauth_parameters[key]) + return True + else: + self.errors.write("oauth_access_token get response status code fail : " + repr(resp)) + return False + + def logout(self): + self._token = None + +METHOD_MAPPING = { + 'get' : 'GET', + 'post' : 'POST', + 'put' : 'POST', + 'head' : 'GET', + 'options' : 'GET', + 'delete' : 'GET' +} + +def _generate_request_wrapper(meth): + def request_wrapper(inst, *args, **kwargs): + path = args[0] if len(args) > 0 else kwargs.get('path','') + data = args[1] if len(args) > 1 else kwargs.get('data',{}) + args = args[2:] + if 'path' in kwargs: + del(kwargs['path']) + if 'data' in kwargs: + del(kwargs['data']) + data = inst._oauth_data.inject_oauth_data(path, METHOD_MAPPING[meth.__name__], data) + return meth(inst,path=path, data=data, *args, **kwargs) + return request_wrapper + +def _generate_login_wrapper(meth): + def login_wrapper(inst, **credential): + return inst._oauth_data.login(inst, meth, **credential) + return login_wrapper + +def _generate_logout_wrapper(meth): + def logout_wrapper(inst): + inst._oauth_data.logout() + meth(inst) + return logout_wrapper + +class OAuthMetaclass(type): + + def __new__(cls, name, bases, attrs): + newattrs = {} + def set_consumer(inst, key, secret): + inst._oauth_data.set_consumer(key,secret) + newattrs['set_consumer'] = set_consumer + def set_scope(inst, scope): + inst._oauth_data.set_scope(scope) + newattrs['set_scope'] = set_scope + + for attrname, attrvalue in attrs.iteritems(): + if attrname in ('get', 'post', 'head', 'options', 'put', 'delete'): + newattrs[attrname] = _generate_request_wrapper(attrvalue) + elif attrname == 'login': + newattrs[attrname] = _generate_login_wrapper(attrvalue) + elif attrname == 'logout': + newattrs[attrname] = _generate_logout_wrapper(attrvalue) + else: + newattrs[attrname] = attrvalue + + for klass in bases: + for attrname, attrvalue in klass.__dict__.iteritems(): + if attrname in newattrs: + continue + if attrname in ('get', 'post', 'head', 'options', 'put', 'delete'): + newattrs[attrname] = _generate_request_wrapper(attrvalue) + elif attrname == 'login': + newattrs[attrname] = _generate_login_wrapper(attrvalue) + elif attrname == 'logout': + newattrs[attrname] = _generate_logout_wrapper(attrvalue) + + init_method = newattrs.get("__init__", None) + + def new_init(inst, *args, **kwargs): + inst._oauth_data = OAuthPayload(attrs.get('servername','testserver')) + if init_method is not None: + init_method(*args,**kwargs) + else: + super(inst.__class__,inst).__init__(*args,**kwargs) + newattrs["__init__"] = new_init + + return super(OAuthMetaclass, cls).__new__(cls, name, bases, newattrs) + + + +class OAuthClient(Client): + __metaclass__ = OAuthMetaclass +# def __init__(self, **default): +# super(OAuthClient,self).__init__(**default) +# self.__token = None +# self.oauth_parameters = { +# 'oauth_version': '1.0', +# 'oauth_callback': 'http://127.0.0.1/callback', +# 'scope':'all' +# } +# +# def __get_signed_request(self, method, path): +# +# oauth_request = Request.from_consumer_and_token(consumer=self.__consumer, token=self.__token, http_method=method, http_url=path, parameters=self.oauth_parameters) +# oauth_request.sign_request(SignatureMethod_HMAC_SHA1(), consumer=self.__consumer, token=self.__token) +# +# return oauth_request +# +# +# def set_consumer(self, key, secret): +# self.__consumer = Consumer(key, secret) +# self.oauth_parameters['oauth_consumer_key'] = key +# +# def set_scope(self, value): +# self.oauth_parameters['scope'] = value +# +# def __inject_oauth_data(self, path, method, data): +# +# path_parsed = urlparse(path) +# +# if method=='GET' and len(data) == 0: +# data= parse_qs(path_parsed[4]) +# +# clean_path = ['']*6 +# clean_path[0] = 'http' +# clean_path[1] = 'testserver' +# for i in range(0,4): +# clean_path[i] = path_parsed[i] or clean_path[i] +# path = urlunparse(clean_path) +# +# oauth_request = self.__get_signed_request(method, path) +# +# data.update(oauth_request) +# +# +# def get(self, path, data={}, follow=False, **extra): +# +# self.__inject_oauth_data(path, 'GET', data) +# return super(OAuthClient, self).get(path, data, follow, **extra) +# +# +# def post(self, path, data={}, content_type=MULTIPART_CONTENT, +# follow=False, **extra): +# self.__inject_oauth_data(path, 'POST', data) +# return super(OAuthClient,self).post(path, data, content_type, follow, **extra) +# +# def head(self, path, data={}, follow=False, **extra): +# self.__inject_oauth_data(path, 'GET', data) +# return super(OAuthClient, self).head(path, data, follow, **extra) +# +# def options(self, path, data={}, follow=False, **extra): +# self.__inject_oauth_data(path, 'GET', data) +# return options(OAuthClient, self).options(path, data, follow, **extra) +# +# def put(self, path, data={}, content_type=MULTIPART_CONTENT, +# follow=False, **extra): +# self.__inject_oauth_data(path, 'POST', data) +# return super(OAuthClient,self).put(path, data, content_type, follow, **extra) +# +# def delete(self, path, data={}, follow=False, **extra): +# self.__inject_oauth_data(path, 'GET', data) +# return super(OAuthClient, self).delete(path, data, follow, **extra) +# +# ### TODO: better document errors +# def login(self, **credential): +# +# #Obtaining a Request Token +# resp = self.get(reverse('oauth_request_token'), follow=True) +# if resp.status_code == 200: +# self.__token = Token.from_string(resp.content) +# else: +# self.errors.write("oauth_request_token response status code fail : " + repr(resp)) +# return False +# +# #Requesting User Authorization +# res = super(OAuthClient, self).login(**credential) +# if not res: +# self.errors.write("login failed : " + repr(credential)) +# return False +# +# resp = self.get(reverse('oauth_user_authorization')) +# if resp.status_code != 200: +# self.errors.write("oauth_user_authorization get response status code fail : " + repr(resp)) +# return False +# +# resp = self.post(reverse('oauth_user_authorization'), {'authorize_access':1}) +# if resp.status_code != 302: +# self.errors.write("oauth_user_authorization post response status code fail : " + repr(resp)) +# return False +# +# location_splitted = urlsplit(resp["Location"]) +# location_query_dict = parse_qs(location_splitted[3]) +# self.__token.verifier = location_query_dict['oauth_verifier'] +# +# +# #Obtaining an Access Token +# resp = self.get(reverse('oauth_access_token')) +# if resp.status_code == 200: +# self.__token = Token.from_string(resp.content) +# return True +# else: +# self.errors.write("oauth_access_token get response status code fail : " + repr(resp)) +# return False +# +# def logout(self): +# super(OAuthClient,self).logout() +# self._token = None + +class OAuthWebClient(WebClient): + __metaclass__ = OAuthMetaclass + servername = '127.0.0.1:8000' diff -r 20c41a7e2173 -r 83b28fc0d731 web/ldt/test/testcases.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/web/ldt/test/testcases.py Mon Dec 13 23:55:19 2010 +0100 @@ -0,0 +1,82 @@ +from django.conf import settings +from django.core.management import call_command +from django.test.testcases import TestCase, TransactionTestCase +import django +import threading +import time +import httplib +from ldt.test.client import WebClient, OAuthClient, OAuthWebClient + + +def launch_webserver(addrport='', keep_running = None, ready_event = None): + call_command('testrunserver',addrport, keep_running, ready_event) + +class WebTestCase(TransactionTestCase): + +# def __init__(self, clientKlass = None): +# super(WebTestCase,self).__init__() +# if clientKlass is not None: +# self.client = clientKlass(self.client) + + def set_login_url(self, value): + self.client.login_url = value + + def _pre_setup(self): + super(WebTestCase,self)._pre_setup() + self._keep_webserver = True + self._lock = threading.Lock() + self._ready_event = threading.Event() + + self.baseurl = "http://"+settings.TEST_WEBSERVER_ADDRPORT + self.client = WebClient() + self.client.baseurl = self.baseurl + login_url = '/' + settings.LOGIN_URL[len(settings.BASE_URL):].lstrip('/') + self.client.login_url = login_url + + def keep_runningserver(): + with self._lock: + return self._keep_webserver + + def launch_server(): + launch_webserver(settings.TEST_WEBSERVER_ADDRPORT, keep_running=keep_runningserver, ready_event = self._ready_event) + + #launch_server() + self._t = threading.Thread(target=launch_server) + self._t.start() + self._ready_event.wait() + time.sleep(0.1) + + + def _post_teardown(self): + with self._lock: + self._keep_webserver = False + conn = httplib.HTTPConnection(settings.TEST_WEBSERVER_ADDRPORT) + conn.request("HEAD", "/") + conn.getresponse() + self._t.join() + super(WebTestCase,self)._post_teardown() + +class OAuthTestCase(TestCase): + + def set_consumer(self, key, secret): + self.client.set_consumer(key, secret) + + def _pre_setup(self): + super(OAuthTestCase,self)._pre_setup() + self.client = OAuthClient() + +class OAuthWebTestCase(WebTestCase): + + def set_consumer(self, key, secret): + self.client.set_consumer(key, secret) + + def _pre_setup(self): + super(OAuthWebTestCase,self)._pre_setup() + self.client = OAuthWebClient() + self.client.baseurl = self.baseurl + login_url = '/' + settings.LOGIN_URL[len(settings.BASE_URL):].lstrip('/') + self.client.login_url = login_url + + + + \ No newline at end of file diff -r 20c41a7e2173 -r 83b28fc0d731 web/ldt/text/fixtures/test_data.json --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/web/ldt/text/fixtures/test_data.json Mon Dec 13 23:55:19 2010 +0100 @@ -0,0 +1,1 @@ +[{"pk": 2, "model": "auth.user", "fields": {"username": "jane", "first_name": "", "last_name": "", "is_active": true, "is_superuser": false, "is_staff": false, "last_login": "2010-12-12 00:04:07", "groups": [], "user_permissions": [], "password": "sha1$45b4e$a4990018063ad9d7aeafaffa349ae61f2b087ed8", "email": "jane@example.com", "date_joined": "2010-12-12 00:04:07"}}, {"pk": 1, "model": "oauth_provider.resource", "fields": {"url": "/api/1.0/text/delete/", "name": "all", "is_readonly": true}}, {"pk": 2, "model": "oauth_provider.resource", "fields": {"url": "/api/1.0/text/delete/", "name": "delete", "is_readonly": true}}, {"pk": 1, "model": "oauth_provider.consumer", "fields": {"status": 1, "name": "", "secret": "kd94hf93k423kf44", "user": 2, "key": "dpf43f3p2l4k3l03", "description": ""}}] diff -r 20c41a7e2173 -r 83b28fc0d731 web/ldt/text/models.py --- a/web/ldt/text/models.py Fri Nov 19 18:14:02 2010 +0100 +++ b/web/ldt/text/models.py Mon Dec 13 23:55:19 2010 +0100 @@ -36,8 +36,8 @@ def fset(self, value): values = None - if type(value) == type([]): - values = value + if isinstance(value, (list,tuple)): + values = list(value) elif value is not None: values = [v.lower().strip() for v in unicode(value).split(",")] diff -r 20c41a7e2173 -r 83b28fc0d731 web/ldt/text/tests.py --- a/web/ldt/text/tests.py Fri Nov 19 18:14:02 2010 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,353 +0,0 @@ -#encoding:UTF-8 - -""" Run these tests with 'python manage.py test text' """ - -from django.test import TestCase -import unittest -import lxml.etree -from ldt.text.models import * -from ldt.core.models import Owner -from views import * -import urllib -import uuid -import tempfile -import datetime -from django.contrib.auth.models import * -from django.conf import settings -from django.test.client import Client -from ldt.text import VERSION_STR -from django.db import transaction -from django.contrib.auth.models import User -from oauth_provider.models import Resource, Consumer -import time -from oauth_provider.models import Token -from oauth.oauth import OAuthRequest, OAuthSignatureMethod_HMAC_SHA1 -from django.contrib.auth.models import User -from oauth_provider.models import Resource, Consumer, Token, Nonce -import time -from oauth_provider.consts import OUT_OF_BAND -from oauth.oauth import OAuthRequest, OAuthSignatureMethod_PLAINTEXT, generate_nonce - - -# This test creates an annotation and checks that: -# 1. the annotation was created in the database (by trying to access it through a 'get') -# 2. the returned xml contains correct data -class CreateTest(unittest.TestCase): - def setUp(self): - self.content = str('f2c1d1fa-629d-4520-a3d2-955b4f2582c0http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168tag1tag2#AAAAAA<![CDATA[titre de l\'annotation]]>oaubert79cd0532-1dda-4130-b351-6a181130a7c92010-09-06 12:33:53.417550oaubert79cd0532-1dda-4130-b351-6a181130a7c92010-09-06 12:33:53.420459') - self.c = Client() - self.annot = Annotation(external_id=u'd2c1d1fa-629d-4520-a3d2-955b4f2582c0', uri=u'http://iri.blabla', tags=u"tag1,tag2", title=u'montitre', description=u'madesc', text=u'letexteselectionne', color=u'#AAAAAA', creator=u'wakimd', contributor=u'wakimd', creation_date=u'2010-09-06 12:33:53.417550', update_date=u'2010-09-06 12:33:53.417550') - self.annot.save() - def tearDown(self): - transaction.rollback() - annotlist=Annotation.objects.all() - for annot in annotlist: - annot.delete() - - def test_create_annotation(self): - response = self.c.post('/api/'+ VERSION_STR +'/text/create/', {'content':self.content}) - #self.assertEqual(response.content, " ") - self.annot1 = lxml.etree.fromstring(response.content) - self.assertEqual(self.annot1.xpath("/iri/text-annotation/id/text()")[0],"f2c1d1fa-629d-4520-a3d2-955b4f2582c0") - self.assertEqual(self.annot1.xpath("/iri/text-annotation/content")[0].tag,"content") - self.assertEqual(self.annot1.xpath("/iri/text-annotation/tags/tag/text()")[0],u"tag1") - self.assertEqual(self.annot1.xpath("/iri/text-annotation/content/text/text()")[0],u"texte selectionne lors de la creation de l\'annotation") - #self.assertEqual(self.annot1.xpath("/iri/text-annotation/meta/created/text()")[0],"2010-09-06 12:33:53.417550") - response2 = self.c.get('/api/'+ VERSION_STR +'/text/get/', {'id':'f2c1d1fa-629d-4520-a3d2-955b4f2582c0'}) - annot2 = lxml.etree.fromstring(response.content) - self.assertEqual(annot2.xpath("/iri/text-annotation/uri/text()")[0], "http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168") - - def test_error_create(self): - content = 'd2c1d1fa-629d-4520-a3d2-955b4f2582c0http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168tag1tag2#AAAAAA<![CDATA[titre de l\'annotation]]>oaubert79cd0532-1dda-4130-b351-6a181130a7c92010-09-06 12:33:53.417550oaubert79cd0532-1dda-4130-b351-6a181130a7c92010-09-06 12:33:53.420459' - response = self.c.post('/api/'+ VERSION_STR +'/text/create/', {'content':content}) - self.assertEqual(response.status_code, 409) - - -# This test creates an annotation, then gets it, and checks that the returned xml contains correct data -class GetTest(unittest.TestCase): - def setUp(self): - self.annotation = Annotation(external_id="d2c1d1fa-629d-4520-a3d2-955b4f2582c0", tags=u"tag1 ,tag2 , tag3", title="titre de l\'annotation",text="texte selectionne lors de la creation de l\'annotation",color="#AAAAAA", creation_date="2010-09-06 12:33:53.417550", update_date="2010-09-06 12:33:53.420459") - self.annotation.save() - self.c = Client() - def tearDown(self): - annotlist=Annotation.objects.all() - for annot in annotlist: - annot.delete() - - def test_get_annotation(self): - response = self.c.get('/api/'+ VERSION_STR +'/text/get/', {'id':'d2c1d1fa-629d-4520-a3d2-955b4f2582c0'}) - print response - self.annot1 = lxml.etree.fromstring(response.content) - self.assertEqual(self.annot1.xpath("/iri/text-annotation/id/text()")[0],self.annotation.external_id) - self.assertEqual(self.annot1.xpath("/iri/text-annotation/tags/tag/text()")[1], "tag2") - self.assertEqual(self.annot1.xpath("/iri/text-annotation/content/color/text()")[0],self.annotation.color) - self.assertEqual(self.annot1.xpath("/iri/text-annotation/meta/created/text()")[0], str(self.annotation.creation_date)) - - def test_error_get(self): - response = self.c.get('/api/'+ VERSION_STR +'/text/get/', {'id':'2'}) - self.assertEqual(response.status_code,404) - - -class FilterTest(unittest.TestCase): - def setUp(self): - self.annotation = Annotation(external_id="k2c1d1fa-629d-4520-a3d2-955b4f2582c0",title="titre de l\'annotation",text="texte selectionne lors de la creation de l\'annotation",color="#AAAAAA", uri="http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168", creator="wakimd") - self.annotation.save() - self.annotation2 = Annotation(external_id="l2c1d1fa-629d-4520-a3d2-955b4f2582c0",title="titre de l\'annotation2",text="texte selectionne lors de la creation de l\'annotation2",color="#BBBBBB", uri="http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168", creator="wakimd") - self.annotation2.save() - self.annotation3 = Annotation(external_id="m2c1d1fa-629d-4520-a3d2-955b4f2582c0", title="titre3", text="texte3", color="#CCCCCC", uri="http://blabla", creator="wakimd") - self.annotation3.save() - self.c = Client() - def tearDown(self): - annotlist=Annotation.objects.all() - for annot in annotlist: - annot.delete() - - def test_filter_annotation_creator_limit(self): - user = 'wakimd' - uri = "http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168" - limit= 1 - response = self.c.get('/api/'+ VERSION_STR +'/text/filter/', {'uri':uri,'creator':user,'limit':limit}) - doc = lxml.etree.fromstring(response.content) - cpt = 0 - for elem in doc.xpath("/iri/text-annotation"): - cpt = cpt + 1 - if limit is not None: - self.assertEqual(cpt,limit) - for elem in doc.xpath("/iri/text-annotation/meta/creator/text()"): - self.assertEqual(elem,user) - for elem in doc.xpath("/iri/text-annotation/uri/text()"): - self.assertEqual(elem[:57],"http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml") - - def test_filter_annotation_uri(self): - uri = "http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168" - response = self.c.get('/api/'+ VERSION_STR +'/text/filter/', {'uri':uri}) - doc = lxml.etree.fromstring(response.content) - for elem in doc.xpath("/iri/text-annotation/uri/text()"): - self.assertEqual(elem[:57],"http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml") - - def test_filter_annotation_filter(self): - uri = "http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168" - filter = 'lors' - limit = None - response = self.c.get('/api/'+ VERSION_STR +'/text/filter/', {'uri':uri,'filter':'lors'}) - doc = lxml.etree.fromstring(response.content) - for elem in doc.xpath("/iri/text-annotation/content/text/text()"): - self.assertTrue('lors' in elem) - #for elem in doc.xpath("/iri/text-annotation/meta/creator/text()"): - # self.assertEqual(elem,user) - - -# This test creates an annotation, then deletes it, and checks that: -# 1. the annotation doesn't exist anymore in the database (by trying to access it through a 'get') -# 2. the returned xml contains no data -class DeleteTest(unittest.TestCase): - def setUp(self): - self.annotation = Annotation(external_id="d2c1d1fa-629d-4520-a3d2-955b4f2582c0",title="titre de l\'annotation",text="texte selectionne lors de la creation de l\'annotation",color="#AAAAAA", creation_date="2010-09-06T12:33:53.417550", update_date="2010-09-06T12:33:53.420459") - self.annotation.save() - self.c = Client() - def tearDown(self): - annotlist=Annotation.objects.all() - for annot in annotlist: - annot.delete() - - def test_delete_annotation(self): - id = urllib.urlencode({'id':'d2c1d1fa-629d-4520-a3d2-955b4f2582c0'}) - f = urllib.urlopen("http://127.0.0.1:8000/api/1.0/text/delete/", id) - response = self.c.post('/api/'+ VERSION_STR +'/text/delete/', {'id':'d2c1d1fa-629d-4520-a3d2-955b4f2582c0'}) - response2 = self.c.get('/ldt/get/', {'id':'d2c1d1fa-629d-4520-a3d2-955b4f2582c0'}) - doc = lxml.etree.fromstring(response.content) - self.assertEqual(doc.xpath("/iri/text-annotation/id/text()"),[]) - self.assertEqual(doc.xpath("/iri/text-annotation/tags/tag/text()"), []) - self.assertEqual(doc.xpath("/iri/text-annotation/content/color/text()"),[]) - self.assertEqual(doc.xpath("/iri/text-annotation/meta/creator/text()"),[]) - self.assertEqual(response2.status_code, 404) - - def test_error_delete(self): - response = self.c.post('/api/'+ VERSION_STR +'/text/ldt/delete/', {'id':'1'}) - self.assertEqual(response.status_code,404) - - -# This test creates an annotation, then updates it with new content, and checks that the returned xml contains the updated data -class UpdateTest(unittest.TestCase): - def setUp(self): - self.annotation = Annotation(external_id="d2c1d1fa-629d-4520-a3d2-955b4f2582c0", tags=u"tag1, mytag",title="titre de l\'annotation",text="texte selectionne lors de la creation de l\'annotation",color="#AAAAAA", creation_date="2010-09-06T12:33:53.417550", update_date="2010-09-06T12:33:53.420459") - self.annotation.save() - self.c = Client() - def tearDown(self): - annotlist=Annotation.objects.all() - for annot in annotlist: - annot.delete() - - def test_update_annotation(self): - content = 'tag1tag2newtag3#DDDDDDoaubert80cd0532-1dda-4130-b351-6a181130a7c92010-11-06 12:33:53.420459' - response = self.c.post('/api/'+ VERSION_STR +'/text/update/', {'content':content,'id':'d2c1d1fa-629d-4520-a3d2-955b4f2582c0'}) - doc = lxml.etree.fromstring(response.content) - #self.assertEqual(lxml.etree.tostring(doc), " ") - self.assertEqual(doc.xpath("/iri/text-annotation/id/text()")[0],"d2c1d1fa-629d-4520-a3d2-955b4f2582c0") - self.assertEqual(doc.xpath("/iri/text-annotation/tags/tag/text()")[1], "tag2new") - self.assertEqual(doc.xpath("/iri/text-annotation/content/color/text()")[0],"#DDDDDD") - - def test_error_update(self): - content = 'd2c1d1fa-629d-4520-a3d2-955b4f2582c0http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168tag1tag2#AAAAAA<![CDATA[titre de l\'annotation]]>oaubert79cd0532-1dda-4130-b351-6a181130a7c92010-09-06 12:33:53.417550oaubert79cd0532-1dda-4130-b351-6a181130a7c92010-09-06 12:33:53.420459' - response = self.c.post('/api/'+ VERSION_STR +'/text/update/', {'content':content,'id':'1'}) - self.assertEqual(response.status_code,404) - - -class OnServerGlobalTest(unittest.TestCase): - def setUp(self): - self.content = urllib.urlencode({'content':'mypersonnalidhttp://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168tag1tag2#AAAAAA<![CDATA[titre de l\'annotation]]>oaubert79cd0532-1dda-4130-b351-6a181130a7c92010-09-06 12:33:53.417550oaubert79cd0532-1dda-4130-b351-6a181130a7c92010-09-06 12:33:53.420459'}) - self.content2 = urllib.urlencode({'content':'mypersonnalid2http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168tag1tag2#BBBBBB<![CDATA[titre de l\'annotation2]]>wakimd79cd0532-1dda-4130-b351-6a181130a7c92010-09-06 12:33:53.417550oaubert79cd0532-1dda-4130-b351-6a181130a7c92010-09-06 12:33:53.420459'}) - self.id = urllib.urlencode({"id":"mypersonnalid"}) - self.id2 = urllib.urlencode({"id":"mypersonnalid2"}) - self.uri = urllib.urlencode({"uri":"http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168"}) - self.filt1 = urllib.urlencode({"uri":"http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168", "creator":"","limit":"","filter":""}) - self.filt2 = urllib.urlencode({"uri":"http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168","creator":"wakimd","limit":"","filter":""}) - self.up = urllib.urlencode({'content':'tag1tag2newtag3#DDDDDDoaubert80cd0532-1dda-4130-b351-6a181130a7c92010-11-06 12:33:53.420459','id':'mypersonnalid'}) - - def test_everything(self): - creation = urllib.urlopen("http://127.0.0.1:8000/api/"+VERSION_STR+"/text/create/", self.content) - creation2 = urllib.urlopen("http://127.0.0.1:8000/api/"+VERSION_STR+"/text/create/", self.content2) - - get = urllib.urlopen("http://127.0.0.1:8000/api/"+VERSION_STR+"/text/get/?%s" % self.id) - - update = urllib.urlopen("http://127.0.0.1:8000/api/"+VERSION_STR+"/text/update/", self.up) - - filt1 = urllib.urlopen("http://127.0.0.1:8000/api/"+VERSION_STR+"/text/filter/?%s", self.uri) - filt2 = urllib.urlopen("http://127.0.0.1:8000/api/"+VERSION_STR+"/text/filter/?uri=http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168?creator=wakimd") - tmp = open('debug.html','r+') - tmp.write(filt2.read()) - - delete = urllib.urlopen("http://127.0.0.1:8000/api/"+VERSION_STR+"/text/delete/", self.id) - delete = urllib.urlopen("http://127.0.0.1:8000/api/"+VERSION_STR+"/text/delete/", self.id2) - - -class OauthTestDelete(unittest.TestCase): - def setUp(self): - #create a user - self.jane = User.objects.create_user('jane', 'jane@example.com', 'toto') - - resource = Resource(name='delete', url='/api/1.0/text/delete/') - resource.save() - - self.CONSUMER_KEY = 'dpf43f3p2l4k3l03' - self.CONSUMER_SECRET = 'kd94hf93k423kf44' - self.consumer = Consumer(key=self.CONSUMER_KEY, secret=self.CONSUMER_SECRET, name='printer.example.com', user=self.jane) - self.consumer.save() - - self.nonce = generate_nonce(8) - - #auth parameters - self.parameters = { - 'oauth_consumer_key': self.CONSUMER_KEY, - 'oauth_signature_method': 'PLAINTEXT', - 'oauth_signature': '%s&' % self.CONSUMER_SECRET, - 'oauth_timestamp': str(int(time.time())), - 'oauth_nonce': self.nonce, - 'oauth_version': '1.0', - 'oauth_callback': 'http://printer.example.com/request_token_ready', - 'scope':'delete' - } - - #test client - self.c = Client() - - self.annotation = Annotation(external_id="d2c1d1fa-629d-4520-a3d2-955b4f2582c0",title="titre de l\'annotation",text="texte selectionne lors de la creation de l\'annotation",color="#AAAAAA", creation_date="2010-09-06T12:33:53.417550", update_date="2010-09-06T12:33:53.420459") - self.annotation.save() - - def tearDown(self): - Token.objects.all().delete() - Resource.objects.all().delete() - Consumer.objects.all().delete() - Nonce.objects.all().delete() - User.objects.all().delete() - - - def test_auth_access_delete(self): - ## REQUEST TOKEN - - response = self.c.get("/oauth/request_token/", self.parameters) - #self.assertEqual(response.content," ") - self.assertEqual(response.status_code,200) - token = list(Token.objects.all())[-1] - self.assertTrue(token.key in response.content) - self.assertTrue(token.secret in response.content) - self.assertEqual(token.callback, u'http://printer.example.com/request_token_ready'), - self.assertTrue(token.callback_confirmed) - -# token.callback = OUT_OF_BAND -# token.save() -# - ## USER AUTHORIZATION - - parameters = { - 'oauth_token': token.key, - } - - response = self.c.get("/oauth/authorize/", parameters) - self.assertEqual(response.status_code,302) - self.assertTrue(token.key in response['Location']) - - self.c.login(username='jane', password='toto') - - response = self.c.get("/oauth/authorize/", parameters) - self.assertEqual(response.status_code,200) - self.assertEqual(response.content,'Fake authorize view for printer.example.com.') - -# parameters['authorize_access'] = 0 -# response = self.c.post("/oauth/authorize/", parameters) -# self.assertEqual(response.content, "Fake callback view.") - - # fake authorization by the user - parameters['authorize_access'] = 1 - response = self.c.post("/oauth/authorize/", parameters) - self.assertEqual(response.status_code,302) - token = list(Token.objects.all())[-1] - self.assertTrue(token.key in response['Location']) - self.assertTrue(token.is_approved) - - ## ACCESS TOKEN - - parameters = { - 'oauth_consumer_key': self.CONSUMER_KEY, - 'oauth_token': token.key, - 'oauth_signature_method': 'PLAINTEXT', - 'oauth_signature': '%s&%s' % (self.CONSUMER_SECRET, token.secret), - 'oauth_timestamp': str(int(time.time())), - 'oauth_nonce': self.nonce, - 'oauth_version': '1.0', - 'oauth_verifier': token.verifier, - } - response = self.c.get("/oauth/access_token/", parameters) - - access_token = list(Token.objects.filter(token_type=Token.ACCESS))[-1] - self.assertTrue(access_token.key in response.content) - self.assertTrue(access_token.secret in response.content) - self.assertEqual(access_token.user.username, u'jane') - - ## ACCESSING PROTECTED VIEW - - parameters = { - 'oauth_consumer_key': self.CONSUMER_KEY, - 'oauth_token': access_token.key, - 'oauth_signature_method': 'HMAC-SHA1', - 'oauth_timestamp': str(int(time.time())), - 'oauth_nonce': self.nonce, - 'oauth_version': '1.0', - } - - oauth_request = OAuthRequest.from_token_and_callback(access_token, http_url='/api/1.0/text/delete/', parameters=parameters) - signature_method = OAuthSignatureMethod_HMAC_SHA1() - signature = signature_method.build_signature(oauth_request, self.consumer, access_token) - - parameters['oauth_signature'] = signature - #self.assertEqual(signature, " ") - parameters['id'] = 'd2c1d1fa-629d-4520-a3d2-955b4f2582c0' - response = self.c.post("/api/1.0/text/delete/", parameters) - self.assertEqual(response.content, " ") - self.assertEqual(response.status_code,200) - - self.c.logout() - access_token.delete() -#/api/1.0/text/delete/ -#/api/1.0/text/update/ -#/api/1.0/text/create/ \ No newline at end of file diff -r 20c41a7e2173 -r 83b28fc0d731 web/ldt/text/tests/__init__.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/web/ldt/text/tests/__init__.py Mon Dec 13 23:55:19 2010 +0100 @@ -0,0 +1,2 @@ +from server_tests import * +from oauth_tests import * diff -r 20c41a7e2173 -r 83b28fc0d731 web/ldt/text/tests/base_tests.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/web/ldt/text/tests/base_tests.py Mon Dec 13 23:55:19 2010 +0100 @@ -0,0 +1,180 @@ +#encoding:UTF-8 + +""" Run these tests with 'python manage.py test text' """ + +from django.test import TestCase +import unittest +import lxml.etree +from ldt.text.models import * +from ldt.core.models import Owner +from ldt.text.views import * +import urllib +import uuid +import tempfile +import datetime +from django.contrib.auth.models import * +from django.conf import settings +from django.test.client import Client +from ldt.text import VERSION_STR +from django.db import transaction +import time + + +# This test creates an annotation and checks that: +# 1. the annotation was created in the database (by trying to access it through a 'get') +# 2. the returned xml contains correct data +class CreateTest(TestCase): + def setUp(self): + self.content = str('f2c1d1fa-629d-4520-a3d2-955b4f2582c0http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168tag1tag2#AAAAAA<![CDATA[titre de l\'annotation]]>oaubert79cd0532-1dda-4130-b351-6a181130a7c92010-09-06 12:33:53.417550oaubert79cd0532-1dda-4130-b351-6a181130a7c92010-09-06 12:33:53.420459') + self.annot = Annotation(external_id=u'd2c1d1fa-629d-4520-a3d2-955b4f2582c0', uri=u'http://iri.blabla', tags=u"tag1,tag2", title=u'montitre', description=u'madesc', text=u'letexteselectionne', color=u'#AAAAAA', creator=u'wakimd', contributor=u'wakimd', creation_date=u'2010-09-06 12:33:53.417550', update_date=u'2010-09-06 12:33:53.417550') + self.annot.save() + def tearDown(self): + transaction.rollback() + annotlist=Annotation.objects.all() + for annot in annotlist: + annot.delete() + + def test_create_annotation(self): + response = self.client.post('/api/'+ VERSION_STR +'/text/create/', {'content':self.content}) + #self.assertEqual(response.content, " ") + self.annot1 = lxml.etree.fromstring(response.content) + self.assertEqual(self.annot1.xpath("/iri/text-annotation/id/text()")[0],"f2c1d1fa-629d-4520-a3d2-955b4f2582c0") + self.assertEqual(self.annot1.xpath("/iri/text-annotation/content")[0].tag,"content") + self.assertEqual(self.annot1.xpath("/iri/text-annotation/tags/tag/text()")[0],u"tag1") + self.assertEqual(self.annot1.xpath("/iri/text-annotation/content/text/text()")[0],u"texte selectionne lors de la creation de l\'annotation") + #self.assertEqual(self.annot1.xpath("/iri/text-annotation/meta/created/text()")[0],"2010-09-06 12:33:53.417550") + response2 = self.client.get('/api/'+ VERSION_STR +'/text/get/', {'id':'f2c1d1fa-629d-4520-a3d2-955b4f2582c0'}) + annot2 = lxml.etree.fromstring(response.content) + self.assertEqual(annot2.xpath("/iri/text-annotation/uri/text()")[0], "http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168") + + def test_error_create(self): + content = 'd2c1d1fa-629d-4520-a3d2-955b4f2582c0http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168tag1tag2#AAAAAA<![CDATA[titre de l\'annotation]]>oaubert79cd0532-1dda-4130-b351-6a181130a7c92010-09-06 12:33:53.417550oaubert79cd0532-1dda-4130-b351-6a181130a7c92010-09-06 12:33:53.420459' + response = self.client.post('/api/'+ VERSION_STR +'/text/create/', {'content':content}) + self.assertEqual(response.status_code, 409) + + +# This test creates an annotation, then gets it, and checks that the returned xml contains correct data +class GetTest(unittest.TestCase): + def setUp(self): + self.annotation = Annotation(external_id="d2c1d1fa-629d-4520-a3d2-955b4f2582c0", tags=u"tag1 ,tag2 , tag3", title="titre de l\'annotation",text="texte selectionne lors de la creation de l\'annotation",color="#AAAAAA", creation_date="2010-09-06 12:33:53.417550", update_date="2010-09-06 12:33:53.420459") + self.annotation.save() + def tearDown(self): + annotlist=Annotation.objects.all() + for annot in annotlist: + annot.delete() + + def test_get_annotation(self): + response = self.client.get('/api/'+ VERSION_STR +'/text/get/', {'id':'d2c1d1fa-629d-4520-a3d2-955b4f2582c0'}) + print response + self.annot1 = lxml.etree.fromstring(response.content) + self.assertEqual(self.annot1.xpath("/iri/text-annotation/id/text()")[0],self.annotation.external_id) + self.assertEqual(self.annot1.xpath("/iri/text-annotation/tags/tag/text()")[1], "tag2") + self.assertEqual(self.annot1.xpath("/iri/text-annotation/content/color/text()")[0],self.annotation.color) + self.assertEqual(self.annot1.xpath("/iri/text-annotation/meta/created/text()")[0], str(self.annotation.creation_date)) + + def test_error_get(self): + response = self.client.get('/api/'+ VERSION_STR +'/text/get/', {'id':'2'}) + self.assertEqual(response.status_code,404) + + +class FilterTest(unittest.TestCase): + def setUp(self): + self.annotation = Annotation(external_id="k2c1d1fa-629d-4520-a3d2-955b4f2582c0",title="titre de l\'annotation",text="texte selectionne lors de la creation de l\'annotation",color="#AAAAAA", uri="http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168", creator="wakimd") + self.annotation.save() + self.annotation2 = Annotation(external_id="l2c1d1fa-629d-4520-a3d2-955b4f2582c0",title="titre de l\'annotation2",text="texte selectionne lors de la creation de l\'annotation2",color="#BBBBBB", uri="http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168", creator="wakimd") + self.annotation2.save() + self.annotation3 = Annotation(external_id="m2c1d1fa-629d-4520-a3d2-955b4f2582c0", title="titre3", text="texte3", color="#CCCCCC", uri="http://blabla", creator="wakimd") + self.annotation3.save() + def tearDown(self): + annotlist=Annotation.objects.all() + for annot in annotlist: + annot.delete() + + def test_filter_annotation_creator_limit(self): + user = 'wakimd' + uri = "http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168" + limit= 1 + response = self.client.get('/api/'+ VERSION_STR +'/text/filter/', {'uri':uri,'creator':user,'limit':limit}) + doc = lxml.etree.fromstring(response.content) + cpt = 0 + for elem in doc.xpath("/iri/text-annotation"): + cpt = cpt + 1 + if limit is not None: + self.assertEqual(cpt,limit) + for elem in doc.xpath("/iri/text-annotation/meta/creator/text()"): + self.assertEqual(elem,user) + for elem in doc.xpath("/iri/text-annotation/uri/text()"): + self.assertEqual(elem[:57],"http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml") + + def test_filter_annotation_uri(self): + uri = "http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168" + response = self.client.get('/api/'+ VERSION_STR +'/text/filter/', {'uri':uri}) + doc = lxml.etree.fromstring(response.content) + for elem in doc.xpath("/iri/text-annotation/uri/text()"): + self.assertEqual(elem[:57],"http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml") + + def test_filter_annotation_filter(self): + uri = "http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168" + filter = 'lors' + limit = None + response = self.client.get('/api/'+ VERSION_STR +'/text/filter/', {'uri':uri,'filter':'lors'}) + doc = lxml.etree.fromstring(response.content) + for elem in doc.xpath("/iri/text-annotation/content/text/text()"): + self.assertTrue('lors' in elem) + #for elem in doc.xpath("/iri/text-annotation/meta/creator/text()"): + # self.assertEqual(elem,user) + + +# This test creates an annotation, then deletes it, and checks that: +# 1. the annotation doesn't exist anymore in the database (by trying to access it through a 'get') +# 2. the returned xml contains no data +class DeleteTest(unittest.TestCase): + def setUp(self): + self.annotation = Annotation(external_id="d2c1d1fa-629d-4520-a3d2-955b4f2582c0",title="titre de l\'annotation",text="texte selectionne lors de la creation de l\'annotation",color="#AAAAAA", creation_date="2010-09-06T12:33:53.417550", update_date="2010-09-06T12:33:53.420459") + self.annotation.save() + def tearDown(self): + annotlist=Annotation.objects.all() + for annot in annotlist: + annot.delete() + + def test_delete_annotation(self): + id = urllib.urlencode({'id':'d2c1d1fa-629d-4520-a3d2-955b4f2582c0'}) + f = urllib.urlopen("http://127.0.0.1:8000/api/1.0/text/delete/", id) + response = self.client.post('/api/'+ VERSION_STR +'/text/delete/', {'id':'d2c1d1fa-629d-4520-a3d2-955b4f2582c0'}) + response2 = self.client.get('/ldt/get/', {'id':'d2c1d1fa-629d-4520-a3d2-955b4f2582c0'}) + doc = lxml.etree.fromstring(response.content) + self.assertEqual(doc.xpath("/iri/text-annotation/id/text()"),[]) + self.assertEqual(doc.xpath("/iri/text-annotation/tags/tag/text()"), []) + self.assertEqual(doc.xpath("/iri/text-annotation/content/color/text()"),[]) + self.assertEqual(doc.xpath("/iri/text-annotation/meta/creator/text()"),[]) + self.assertEqual(response2.status_code, 404) + + def test_error_delete(self): + response = self.client.post('/api/'+ VERSION_STR +'/text/ldt/delete/', {'id':'1'}) + self.assertEqual(response.status_code,404) + + +# This test creates an annotation, then updates it with new content, and checks that the returned xml contains the updated data +class UpdateTest(unittest.TestCase): + def setUp(self): + self.annotation = Annotation(external_id="d2c1d1fa-629d-4520-a3d2-955b4f2582c0", tags=u"tag1, mytag",title="titre de l\'annotation",text="texte selectionne lors de la creation de l\'annotation",color="#AAAAAA", creation_date="2010-09-06T12:33:53.417550", update_date="2010-09-06T12:33:53.420459") + self.annotation.save() + def tearDown(self): + annotlist=Annotation.objects.all() + for annot in annotlist: + annot.delete() + + def test_update_annotation(self): + content = 'tag1tag2newtag3#DDDDDDoaubert80cd0532-1dda-4130-b351-6a181130a7c92010-11-06 12:33:53.420459' + response = self.client.post('/api/'+ VERSION_STR +'/text/update/', {'content':content,'id':'d2c1d1fa-629d-4520-a3d2-955b4f2582c0'}) + doc = lxml.etree.fromstring(response.content) + #self.assertEqual(lxml.etree.tostring(doc), " ") + self.assertEqual(doc.xpath("/iri/text-annotation/id/text()")[0],"d2c1d1fa-629d-4520-a3d2-955b4f2582c0") + self.assertEqual(doc.xpath("/iri/text-annotation/tags/tag/text()")[1], "tag2new") + self.assertEqual(doc.xpath("/iri/text-annotation/content/color/text()")[0],"#DDDDDD") + + def test_error_update(self): + content = 'd2c1d1fa-629d-4520-a3d2-955b4f2582c0http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168tag1tag2#AAAAAA<![CDATA[titre de l\'annotation]]>oaubert79cd0532-1dda-4130-b351-6a181130a7c92010-09-06 12:33:53.417550oaubert79cd0532-1dda-4130-b351-6a181130a7c92010-09-06 12:33:53.420459' + response = self.client.post('/api/'+ VERSION_STR +'/text/update/', {'content':content,'id':'1'}) + self.assertEqual(response.status_code,404) + diff -r 20c41a7e2173 -r 83b28fc0d731 web/ldt/text/tests/oauth_tests.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/web/ldt/text/tests/oauth_tests.py Mon Dec 13 23:55:19 2010 +0100 @@ -0,0 +1,178 @@ +#encoding:UTF-8 + +""" Run these tests with 'python manage.py test text' """ + +from django.conf import settings, settings +from django.contrib.auth.models import * +from django.db import transaction +from django.test import TestCase +from django.test.client import Client +from ldt.test.testcases import OAuthTestCase +from ldt.text import VERSION_STR +from ldt.text.models import Annotation +from ldt.text.views import * +from oauth2 import Request, SignatureMethod_HMAC_SHA1, SignatureMethod_PLAINTEXT, \ + generate_nonce +from oauth_provider.consts import OUT_OF_BAND +from oauth_provider.models import Resource, Consumer, Token, Nonce +import logging +import time +import urlparse + +class OAuthTestDelete(TestCase): + def setUp(self): + #create a user + self.jane = User.objects.create_user('jane', 'jane@example.com', 'toto') + + resource = Resource(name='all', url='/api/'+VERSION_STR+'/text/delete/') + resource.save() + + resource = Resource(name='delete', url='/api/'+VERSION_STR+'/text/delete/') + resource.save() + + self.CONSUMER_KEY = 'dpf43f3p2l4k3l03' + self.CONSUMER_SECRET = 'kd94hf93k423kf44' + self.consumer = Consumer(key=self.CONSUMER_KEY, secret=self.CONSUMER_SECRET, name='printer.example.com', user=self.jane) + self.consumer.save() + + self.nonce = generate_nonce(8) + + #auth parameters + self.parameters = { + 'oauth_consumer_key': self.CONSUMER_KEY, + 'oauth_signature_method': 'PLAINTEXT', + 'oauth_signature': '%s&' % self.CONSUMER_SECRET, + 'oauth_timestamp': str(int(time.time())), + 'oauth_nonce': self.nonce, + 'oauth_version': '1.0', + 'oauth_callback': OUT_OF_BAND, + 'scope':'delete' + } + + self.annotation = Annotation(external_id="d2c1d1fa-629d-4520-a3d2-955b4f2582c0",title="titre de l\'annotation",text="texte selectionne lors de la creation de l\'annotation",color="#AAAAAA", creation_date="2010-09-06T12:33:53.417550", update_date="2010-09-06T12:33:53.420459") + self.annotation.save() + + + def test_auth_access_delete(self): + ## REQUEST TOKEN + + response = self.client.get("/oauth/request_token/", self.parameters) + #self.assertEqual(response.content," ") + self.assertEqual(response.status_code,200) + token = list(Token.objects.all())[-1] + logging.debug(response.content) + data = urlparse.parse_qs(response.content) + self.assertEqual(token.key, data["oauth_token"][0]) + self.assertEqual(token.secret, data['oauth_token_secret'][0]) + self.assertTrue(data['oauth_callback_confirmed'][0]) + self.assertEqual(token.callback, None), + +# token.callback = OUT_OF_BAND +# token.save() +# + ## USER AUTHORIZATION + + parameters = { + 'oauth_token': token.key, + } + + response = self.client.get("/oauth/authorize/", parameters) + self.assertEqual(response.status_code,302) + self.assertTrue(token.key in response['Location']) + logging.debug(repr(response['location'])) + + self.client.login(username='jane', password='toto') + + response = self.client.get("/oauth/authorize/", parameters) + self.assertEqual(response.status_code,200) + self.assertEqual(response.content,'Fake authorize view for printer.example.com.') + +# parameters['authorize_access'] = 0 +# response = self.c.post("/oauth/authorize/", parameters) +# self.assertEqual(response.content, "Fake callback view.") + + # fake authorization by the user + parameters['authorize_access'] = 1 + response = self.client.post("/oauth/authorize/", parameters) + self.assertEqual(response.status_code,200) + token = list(Token.objects.all())[-1] + #self.assertTrue(token.key in response['Location']) + self.assertTrue(token.is_approved) + + ## ACCESS TOKEN + + parameters = { + 'oauth_consumer_key': self.CONSUMER_KEY, + 'oauth_token': token.key, + 'oauth_signature_method': 'PLAINTEXT', + 'oauth_signature': '%s&%s' % (self.CONSUMER_SECRET, token.secret), + 'oauth_timestamp': str(int(time.time())), + 'oauth_nonce': self.nonce, + 'oauth_version': '1.0', + 'oauth_verifier': token.verifier, + } + response = self.client.get("/oauth/access_token/", parameters) + + access_token = list(Token.objects.filter(token_type=Token.ACCESS))[-1] + self.assertTrue(access_token.key in response.content) + self.assertTrue(access_token.secret in response.content) + self.assertEqual(access_token.user.username, u'jane') + + ## ACCESSING PROTECTED VIEW + + parameters = { + 'oauth_consumer_key': self.CONSUMER_KEY, + 'oauth_token': access_token.key, + 'oauth_signature_method': 'HMAC-SHA1', + 'oauth_timestamp': str(int(time.time())), + 'oauth_nonce': self.nonce, + 'oauth_version': '1.0', + 'id':'d2c1d1fa-629d-4520-a3d2-955b4f2582c0' + } + + oauth_request = Request.from_token_and_callback(access_token, http_url='http://testserver/api/'+VERSION_STR+'/text/delete/', parameters=parameters, http_method="POST") + signature_method = SignatureMethod_HMAC_SHA1() + signature = signature_method.sign(oauth_request, self.consumer, access_token) + + parameters['oauth_signature'] = signature + #self.assertEqual(signature, " ") + response = self.client.post("/api/"+VERSION_STR+"/text/delete/", parameters) + self.assertEqual(response.content, "") + self.assertEqual(response.status_code,200) + + self.client.logout() + access_token.delete() + + +class OAuthTestDeleteClient(OAuthTestCase): + def setUp(self): + #create a user + self.jane = User.objects.create_user('jane', 'jane@example.com', 'toto') + + resource = Resource(name='all', url='/api/'+VERSION_STR+'/text/delete/') + resource.save() + + resource = Resource(name='delete', url='/api/'+VERSION_STR+'/text/delete/') + resource.save() + + self.CONSUMER_KEY = 'dpf43f3p2l4k3l03' + self.CONSUMER_SECRET = 'kd94hf93k423kf44' + + self.set_consumer(self.CONSUMER_KEY, self.CONSUMER_SECRET) + + self.consumer = Consumer(key=self.CONSUMER_KEY, secret=self.CONSUMER_SECRET, name='printer.example.com', user=self.jane) + self.consumer.save() + + self.annotation = Annotation(external_id="d2c1d1fa-629d-4520-a3d2-955b4f2582c0",title="titre de l\'annotation",text="texte selectionne lors de la creation de l\'annotation",color="#AAAAAA", creation_date="2010-09-06T12:33:53.417550", update_date="2010-09-06T12:33:53.420459") + self.annotation.save() + + + def test_auth_access_delete(self): + + res = self.client.login(username='jane', password='toto') + self.assertTrue(res) + + parameters = { 'id' : 'd2c1d1fa-629d-4520-a3d2-955b4f2582c0' } + response = self.client.post(path="/api/"+VERSION_STR+"/text/delete/", data=parameters) + self.assertEqual(response.content, "") + self.assertEqual(response.status_code,200) diff -r 20c41a7e2173 -r 83b28fc0d731 web/ldt/text/tests/server_tests.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/web/ldt/text/tests/server_tests.py Mon Dec 13 23:55:19 2010 +0100 @@ -0,0 +1,68 @@ +#encoding:UTF-8 + +""" Run these tests with 'python manage.py test text' """ + +from django.conf import settings +from django.contrib.auth.models import * +from django.test import TestCase +from ldt.core.models import Owner +from ldt.test.testcases import OAuthWebTestCase +from ldt.text import VERSION_STR +from ldt.text.models import * +from ldt.text.views import * +from oauth_provider.models import Resource, Consumer, Token, Nonce +import datetime +import logging +import lxml.etree +import tempfile +import time +import unittest +import urllib +import uuid + +class OnServerGlobalTest(OAuthWebTestCase): + + fixtures = ['test_data'] + + def setUp(self): + + self.id = "mypersonnalid" + self.id2 = "mypersonnalid2" + + self.CONSUMER_KEY = 'dpf43f3p2l4k3l03' + self.CONSUMER_SECRET = 'kd94hf93k423kf44' + + self.set_consumer(self.CONSUMER_KEY, self.CONSUMER_SECRET) + + self.set_login_url("/accounts/login/") + + + def test_everything(self): + + res = self.client.login(username='jane', password='toto') + res = self.assertTrue(res) + + creation = self.client.post("/api/"+VERSION_STR+"/text/create/", data={'content':''+self.id+'http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168tag1tag2#AAAAAA<![CDATA[titre de l\'annotation]]>oaubert79cd0532-1dda-4130-b351-6a181130a7c92010-09-06 12:33:53.417550oaubert79cd0532-1dda-4130-b351-6a181130a7c92010-09-06 12:33:53.420459'}) + logging.debug("OnServerGlobalTest.test_everything creation : " + repr(creation.content)) + + creation2 = self.client.post("/api/"+VERSION_STR+"/text/create/", data={'content':''+self.id2+'http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168tag1tag2#BBBBBB<![CDATA[titre de l\'annotation2]]>wakimd79cd0532-1dda-4130-b351-6a181130a7c92010-09-06 12:33:53.417550oaubert79cd0532-1dda-4130-b351-6a181130a7c92010-09-06 12:33:53.420459'}) + logging.debug("OnServerGlobalTest.test_everything creation2 : " + repr(creation2.content)) + + get = self.client.get("/api/"+VERSION_STR+"/text/get/", {"id":self.id}) + logging.debug("OnServerGlobalTest.test_everything get : " + repr(get.content)) + + update = self.client.post("/api/"+VERSION_STR+"/text/update/", data={'content':'tag1tag2newtag3#DDDDDDoaubert80cd0532-1dda-4130-b351-6a181130a7c92010-11-06 12:33:53.420459','id':self.id}) + logging.debug("OnServerGlobalTest.test_everything update : " + repr(update.content)) + + filt1 = self.client.get("/api/"+VERSION_STR+"/text/filter/", data={"uri":"http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168"}) + logging.debug("OnServerGlobalTest.test_everything filt1 : " + repr(filt1.content)) + + filt2 = self.client.get("/api/"+VERSION_STR+"/text/filter/", data={"uri":"http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168","creator":"wakimd"}) + + logging.debug("OnServerGlobalTest.test_everything filt2 : " +repr(filt2.content)) + + delete = self.client.post("/api/"+VERSION_STR+"/text/delete/", {"id":self.id}) + logging.debug("OnServerGlobalTest.test_everything delete1 : " + repr(delete.content)) + delete = self.client.post("/api/"+VERSION_STR+"/text/delete/", {"id":self.id2}) + logging.debug("OnServerGlobalTest.test_everything delete2 : " + repr(delete.content)) + diff -r 20c41a7e2173 -r 83b28fc0d731 web/ldt/text/tests/utils.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/web/ldt/text/tests/utils.py Mon Dec 13 23:55:19 2010 +0100 @@ -0,0 +1,1 @@ + diff -r 20c41a7e2173 -r 83b28fc0d731 web/ldt/text/utils.py --- a/web/ldt/text/utils.py Fri Nov 19 18:14:02 2010 +0100 +++ b/web/ldt/text/utils.py Mon Dec 13 23:55:19 2010 +0100 @@ -1,11 +1,9 @@ -import uuid +from django.conf import settings +import base64 import django.core.urlresolvers -from django.conf import settings -from ldt.text.models import * +import lxml.etree import urllib -import datetime -import lxml.etree -import base64 +import uuid __BOOLEAN_DICT = { 'false':False, @@ -38,27 +36,3 @@ return taglist - -def create_empty_annotation(): - iri = lxml.etree.Element('iri') - doc = lxml.etree.ElementTree(iri) - - textannotation = lxml.etree.SubElement(iri, 'text-annotation') - id = lxml.etree.SubElement(textannotation,'id') - uri = lxml.etree.SubElement(textannotation,'uri') - tags = lxml.etree.SubElement(textannotation,'tags') - - content = lxml.etree.SubElement(textannotation,'content') - color = lxml.etree.SubElement(content,'color') - description = lxml.etree.SubElement(content,'description') - title = lxml.etree.SubElement(content,'title') - text = lxml.etree.SubElement(content,'text') - - meta = lxml.etree.SubElement(textannotation,'meta') - contributor = lxml.etree.SubElement(meta, "contributor") - creator = lxml.etree.SubElement(meta, "creator") - creationdate = lxml.etree.SubElement(meta, "created") - updatedate = lxml.etree.SubElement(meta, "modified") - - return doc - diff -r 20c41a7e2173 -r 83b28fc0d731 web/ldt/text/views.py --- a/web/ldt/text/views.py Fri Nov 19 18:14:02 2010 +0100 +++ b/web/ldt/text/views.py Mon Dec 13 23:55:19 2010 +0100 @@ -154,8 +154,7 @@ except Annotation.DoesNotExist: raise Http404 - doc=create_empty_annotation() - return HttpResponse(lxml.etree.tostring(doc, pretty_print=True), mimetype="text/xml;charset=utf-8") + return HttpResponse("") ## Updates the content of an annotation diff -r 20c41a7e2173 -r 83b28fc0d731 web/ldt/utils/__init__.py --- a/web/ldt/utils/__init__.py Fri Nov 19 18:14:02 2010 +0100 +++ b/web/ldt/utils/__init__.py Mon Dec 13 23:55:19 2010 +0100 @@ -1,2 +1,4 @@ +def Property(func): + return property(**func()) \ No newline at end of file diff -r 20c41a7e2173 -r 83b28fc0d731 web/ldt/utils/threading.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/web/ldt/utils/threading.py Mon Dec 13 23:55:19 2010 +0100 @@ -0,0 +1,44 @@ +import threading +import inspect +import ctypes + +def _async_raise(tid, exctype): + """raises the exception, performs cleanup if needed""" + if not inspect.isclass(exctype): + raise TypeError("Only types can be raised (not instances)") + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0) + raise SystemError("PyThreadState_SetAsyncExc failed") + + +class Thread(threading.Thread): + def _get_my_tid(self): + """determines this (self's) thread id""" + if not self.isAlive(): + raise threading.ThreadError("the thread is not active") + + # do we have it cached? + if hasattr(self, "_thread_id"): + return self._thread_id + + # no, look for it in the _active dict + for tid, tobj in threading._active.items(): + if tobj is self: + self._thread_id = tid + return tid + + raise AssertionError("could not determine the thread's id") + + def raise_exc(self, exctype): + """raises the given exception type in the context of this thread""" + _async_raise(self._get_my_tid(), exctype) + + def terminate(self): + """raises SystemExit in the context of the given thread, which should + cause the thread to exit silently (unless caught)""" + self.raise_exc(SystemExit) diff -r 20c41a7e2173 -r 83b28fc0d731 web/leezam/settings.py --- a/web/leezam/settings.py Fri Nov 19 18:14:02 2010 +0100 +++ b/web/leezam/settings.py Mon Dec 13 23:55:19 2010 +0100 @@ -142,9 +142,13 @@ LDT_MAX_SEARCH_NUMBER = 50 LDT_JSON_DEFAULT_INDENT = 2 +OAUTH_PROVIDER_KEY_SIZE = 32 +OAUTH_PROVIDER_SECRET_SIZE = 32 +OAUTH_PROVIDER_VERIFIER_SIZE = 10 +OAUTH_PROVIDER_CONSUMER_KEY_SIZE = 256 OAUTH_AUTHORIZE_VIEW = 'oauth_provider.views.fake_authorize_view' OAUTH_CALLBACK_VIEW = 'oauth_provider.views.fake_callback_view' - +TEST_WEBSERVER_ADDRPORT = "127.0.0.1:8000" from config import *