# HG changeset patch # User rougeronj # Date 1350655722 -7200 # Node ID 9520fb2810e2f7c3a4544f3e1f4833379d0beb9d # Parent 214e220dfab9899316862f01d94f0c978f14bbda Add of unit test for ldt_utils diff -r 214e220dfab9 -r 9520fb2810e2 src/ldt/ldt/ldt_utils/fixtures/base_data.json --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/ldt/ldt/ldt_utils/fixtures/base_data.json Fri Oct 19 16:08:42 2012 +0200 @@ -0,0 +1,36 @@ +[ + { + "pk": 6, + "model": "text.annotation", + "fields": { + "update_date": "2010-11-16 17:30:50", + "description": "texte de description", + "title": "titre de l'annotation", + "color": "#DDDDDD", + "text": "texte selectionne lors de la creation de l'annotation", + "creator": "wakimd", + "uri": "", + "creation_date": "2010-11-16 17:01:41", + "contributor": "oaubert", + "tags_field": "tag3,tag1,", + "external_id": "z2c1d1fa-629d-4520-a3d2-955b4f2582c0" + } + }, + { + "pk": 7, + "model": "text.annotation", + "fields": { + "update_date": "2010-11-16 17:30:50", + "description": "texte de description", + "title": "titre de l'annotation", + "color": "#DDDDDD", + "text": "texte selectionne lors de la creation de l'annotation", + "creator": "wakimd", + "uri": "http://www.leezam.com/pub/epub/123456!/OPS/chapter2.xhtml#pos=56,168", + "creation_date": "2010-11-16 17:01:41", + "contributor": "oaubert", + "tags_field": "tag3,tag1,", + "external_id": "mypersonnalid2" + } + } +] diff -r 214e220dfab9 -r 9520fb2810e2 src/ldt/ldt/ldt_utils/fixtures/user_data.json --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/ldt/ldt/ldt_utils/fixtures/user_data.json Fri Oct 19 16:08:42 2012 +0200 @@ -0,0 +1,67 @@ +[ + { + "pk": 2, + "model": "auth.user", + "fields": { + "username": "admin", + "first_name": "", + "last_name": "", + "is_active": true, + "is_superuser": true, + "is_staff": false, + "last_login": "2010-12-12 00:04:07", + "groups": [], "user_permissions": [], + "password": "", + "email": "admin@example.com", + "date_joined": "2010-12-12 00:04:07" + } + }, + { + "pk": 1, + "model": "oauth_provider.resource", + "fields": { + "url": "/api/1.0/text/delete/", + "name": "delete", + "is_readonly": true + } + }, + { + "pk": 2, + "model": "oauth_provider.resource", + "fields": { + "url": "/api/1.0/text/create/", + "name": "create", + "is_readonly": true + } + }, + { + "pk": 3, + "model": "oauth_provider.resource", + "fields": { + "url": "/api/1.0/text/update/", + "name": "update", + "is_readonly": true + } + }, + { + "pk": 4, + "model": "oauth_provider.resource", + "fields": { + "url": "", + "name": "all", + "is_readonly": true + } + }, + { + "pk": 1, + "model": "oauth_provider.consumer", + "fields": { + "status": 1, + "name": "example.com", + "secret": "kd94hf93k423kf44", + "user": 2, + "key": "dpf43f3p2l4k3l03", + "description": "" + } + } +] \ No newline at end of file diff -r 214e220dfab9 -r 9520fb2810e2 src/ldt/ldt/ldt_utils/tests.py --- a/src/ldt/ldt/ldt_utils/tests.py Tue Oct 16 17:26:49 2012 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,187 +0,0 @@ -""" -This file demonstrates two different styles of tests (one doctest and one -unittest). These will both pass when you run "manage.py test". - -Replace these with more appropriate tests for your application. -""" - -from django.conf import settings -from django.test import TestCase -from ldt.ldt_utils.models import User -from models import Project, Content -from utils import LdtUtils, LdtAnnotation, create_ldt, create_empty_iri, copy_ldt -import lxml.etree -import tempfile -import unittest -import uuid - -class SimpleTest(TestCase): - def test_basic_addition(self): - """ - Tests that 1 + 1 always equals 2. - """ - self.failUnlessEqual(1 + 1, 2) - -__test__ = {"doctest": """ -Another way to test that 1 + 1 is equal to 2. - ->>> 1 + 1 == 2 -True -"""} - -class UtilsTest(unittest.TestCase): - def setUp(self): - self.user = User() - self.user.username = "toto" - self.LU = LdtUtils() - - self.project = Project(title="titleproj1", owner=self.user) - self.project.ldt = ' CA: prof et admin <abstract/> <audio source=""/> <tags/> </element> <element id="s_0050F043-3AD2-0A7C-6699-D2A03A1EBA02" begin="5052858" dur="124407" author="" date="2010/09/02" color="10053375" src=""> <title>conseil de classe Reprise de la figure precedente TC: prof et admin Conseil de classe conseil de classe Reprise de la figure precedente Bout a bout 1 ' - self.project.id = "11" - self.project.ldt_id = str(uuid.uuid1()) - self.project.description = "proj1description" - self.project.save() - - self.projectcopy = Project(title="the2ndproject") - self.projectcopy.id = "22" - - def tearDown(self): - self.project.delete() - self.projectcopy.delete() - #self.cont1.delete() - #self.cont2.delete() - - def test_generate_ldt(self): - self.cont1 = Content(iriurl="id1/iriurl1") - self.cont1.iri_id = "id1" - self.cont1.save() - - self.cont2 = Content(iriurl="id2/iriurl2") - self.cont2.iri_id = "id2" - self.cont2.save() - - self.project.contents.add(self.cont1, self.cont2) - - f = tempfile.TemporaryFile(mode='r+') - doc = self.LU.generate_ldt(Content.objects.all()) - doc.write(f, pretty_print=True) - f.seek(0) - ldoc = lxml.etree.parse(f) - self.assertEqual(ldoc.xpath("/iri/displays/display/content")[10].get("id"), self.cont2.iri_id) - self.assertEqual(ldoc.xpath("/iri/medias/media")[9].get("id"), self.cont1.iri_id) - f.close() - - def test_generate_init(self): - self.cont3 = Content(iriurl="id3/iriurl1") - self.cont3.iri_id = "id3" - self.cont3.save() - - self.cont4 = Content(iriurl="id4/iriurl2") - self.cont4.iri_id = "id4" - self.cont4.save() - - self.project.contents.add(self.cont3, self.cont4) - ldoc = self.LU.generate_init(['all', 'foo'], 'ldt.ldt_utils.views.search_ldt') - self.assertEqual(ldoc.xpath("/iri/files/init")[0].tag, "init") - self.assertEqual(ldoc.xpath("/iri/files/library")[0].tag, "library") - - def test_create_ldt(self): - self.cont5 = Content(iriurl="id5/iriurl1") - self.cont5.iri_id = "id5" - self.cont5.save() - - self.cont6 = Content(iriurl="id6/iriurl2") - self.cont6.iri_id = "id6" - self.cont6.save() - - self.project.contents.add(self.cont5, self.cont6) - self.project.ldt = "" - create_ldt(self.project, self.user) - ldt = lxml.etree.fromstring(self.project.ldt_encoded) - self.assertEqual(ldt.xpath("/iri")[0].tag, "iri") - self.assertEqual(ldt.xpath("/iri/project")[0].get("title"), self.project.title) - self.assertEqual(ldt.xpath("/iri/medias/media")[0].get("src"), self.cont5.iri_url()) - self.assertEqual(ldt.xpath("/iri/medias/media")[1].get("id"), self.cont6.iri_id) - - def test_copy_ldt(self): - self.cont7 = Content(iriurl="id7/iriurl1") - self.cont7.iri_id = "id7" - self.cont7.save() - - self.cont8 = Content(iriurl="id8/iriurl2") - self.cont8.iri_id = "id8" - self.cont8.save() - - self.project.contents.add(self.cont7, self.cont8) - copy_ldt(self.project, self.projectcopy, self.user) - ldt1 = lxml.etree.fromstring(self.project.ldt_encoded) - ldt2 = lxml.etree.fromstring(self.projectcopy.ldt_encoded) - self.assertTrue(ldt1.xpath("/iri/project")[0].get("id") != ldt2.xpath("/iri/project")[0].get("id")) - self.assertEqual(ldt1.xpath("/iri/medias/media")[0].get("id"), ldt2.xpath("/iri/medias/media")[0].get("id")) - self.assertEqual(ldt1.xpath("/iri/annotations/content/ensemble")[0].get("title"), ldt2.xpath("/iri/annotations/content/ensemble")[0].get("title")) - self.assertEqual(ldt1.xpath("/iri/annotations/content/ensemble/decoupage")[0].get("id"), ldt2.xpath("/iri/annotations/content/ensemble/decoupage")[0].get("id")) - self.assertEqual(ldt1.xpath("/iri/annotations/content/ensemble/decoupage/title")[1].text, ldt2.xpath("/iri/annotations/content/ensemble/decoupage/title")[1].text.strip("\n\t")) - - def test_create_empty_iri(self): - self.cont9 = Content(iriurl="id9/iriurl1") - self.cont9.iri_id = "id9" - self.cont9.save() - - self.cont10 = Content(iriurl="id10/iriurl2") - self.cont10.iri_id = "id10" - self.cont10.save() - - self.project.contents.add(self.cont9, self.cont10) - tmp = tempfile.TemporaryFile(mode='r+') - create_empty_iri(tmp, self.cont9, "admin") - tmp.seek(0) - ldoc = lxml.etree.parse(tmp) - self.assertEqual(ldoc.xpath("/iri/head/meta")[0].get("content"), self.cont9.iri_id) - self.assertEqual(ldoc.xpath("/iri/body/medias/media/video")[0].get("id"), self.cont9.iri_id) - tmp.close() - - def test_add_annotation(self): - - self.cont11 = Content(iriurl="id11/iriurl1") - self.cont11.iri_id = "id11" - self.cont11.save() - - self.project.contents.add(self.cont11) - self.project.ldt = "" - create_ldt(self.project, self.user) - - self.LA = LdtAnnotation(self.project) - - self.LA.add("id11", "cutting", "title", "text", ["tag1", "tag2"], "800", - "10000", "jdoe", "2011-09-10T09:12:58") - self.LA.save() - ldt = lxml.etree.fromstring(self.project.ldt) - ann = ldt.xpath('/iri/annotations/content[@id="id11"]/ensemble/decoupage/elements/element')[0] - title = ann.xpath('title')[0].text - abstract = ann.xpath('abstract')[0].text - - self.assertEqual(ann.get("author"), "jdoe") - self.assertEqual(title, "title") - self.assertEqual(abstract, "text") - - -class ViewsTest(unittest.TestCase): - def setUp(self): - self.project = Project() - self.project.id = "121" - self.project.save() - self.project.ldt = ' CA: prof et admin <abstract/> <audio source=""/> <tags/> </element> <element id="s_0050F043-3AD2-0A7C-6699-D2A03A1EBA02" begin="5052858" dur="124407" author="" date="2010/09/02" color="10053375" src=""> <title>conseil de classe Reprise de la figure precedente TC: prof et admin Conseil de classe conseil de classe Reprise de la figure precedente Bout a bout 1 ' - - self.cont1 = Content(iriurl="/laurentcantet_entrelesmurs/iriurl1") - self.cont1.iri_id = 'laurentcantet_entrelesmurs' - self.cont1.save() - - self.cont2 = Content(iriurl="/content_notinldt/iriurl2") - self.cont2.iri_id = 'content_notinldt' - self.cont2.save() - - self.project.contents.add(self.cont1, self.cont2) - - def tearDown(self): - self.project.delete() - diff -r 214e220dfab9 -r 9520fb2810e2 src/ldt/ldt/ldt_utils/tests/__init__.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/ldt/ldt/ldt_utils/tests/__init__.py Fri Oct 19 16:08:42 2012 +0200 @@ -0,0 +1,1 @@ +from tests import * diff -r 214e220dfab9 -r 9520fb2810e2 src/ldt/ldt/ldt_utils/tests/content_tests.py diff -r 214e220dfab9 -r 9520fb2810e2 src/ldt/ldt/ldt_utils/tests/ldt_tests.py diff -r 214e220dfab9 -r 9520fb2810e2 src/ldt/ldt/ldt_utils/tests/media_tests.py diff -r 214e220dfab9 -r 9520fb2810e2 src/ldt/ldt/ldt_utils/tests/project_tests.py diff -r 214e220dfab9 -r 9520fb2810e2 src/ldt/ldt/ldt_utils/tests/tests.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/ldt/ldt/ldt_utils/tests/tests.py Fri Oct 19 16:08:42 2012 +0200 @@ -0,0 +1,215 @@ +""" +This file demonstrates two different styles of tests (one doctest and one +unittest). These will both pass when you run "manage.py test". + +Replace these with more appropriate tests for your application. +""" + +from ldt.test.testcases import OAuthTestCase, TestCase +from django.conf import settings +from django.test import TestCase +from ldt.ldt_utils.models import User, Project, Content, Media +from ldt.ldt_utils.utils import LdtUtils, LdtAnnotation, create_ldt, create_empty_iri, copy_ldt +from ldt.test.client import WebClient +import lxml.etree +import tempfile +import unittest +import uuid +import logging + +CONSUMER_KEY = 'dpf43f3p2l4k3l03' +CONSUMER_SECRET = 'kd94hf93k423kf44' + +class UtilsTest(OAuthTestCase): + + fixtures = ['base_data.json', 'user_data.json'] + + def setUp(self): + logging.disable(logging.CRITICAL) + self.user = User() + self.user.username = 'admin' + #self.set_login_url("/auth_accounts/login/") + self.set_consumer(CONSUMER_KEY, CONSUMER_SECRET) + self.client.login(username='admin', password='') + + self.LU = LdtUtils() + self.project = Project(title="titleproj1", owner=self.user) + self.project.ldt = ' CA: prof et admin <abstract/> <audio source=""/> <tags/> </element> <element id="s_0050F043-3AD2-0A7C-6699-D2A03A1EBA02" begin="5052858" dur="124407" author="" date="2010/09/02" color="10053375" src=""> <title>conseil de classe Reprise de la figure precedente TC: prof et admin Conseil de classe conseil de classe Reprise de la figure precedente Bout a bout 1 ' + self.project.id = "11" + self.project.ldt_id = str(uuid.uuid1()) + self.project.description = "proj1description" + self.project.save() + + self.projectcopy = Project(title="the2ndproject") + self.projectcopy.id = "22" + + def tearDown(self): + self.project.delete() + self.projectcopy.delete() + #self.cont1.delete() + #self.cont2.delete() + + def test_generate_ldt(self): + self.cont1 = Content(iriurl="id1/iriurl1") + self.cont1.iri_id = "id1" + self.cont1.save() + + self.cont2 = Content(iriurl="id2/iriurl2") + self.cont2.iri_id = "id2" + self.cont2.save() + + self.project.contents.add(self.cont1, self.cont2) + + f = tempfile.TemporaryFile(mode='r+') + doc = self.LU.generate_ldt(Content.objects.all()) + doc.write(f, pretty_print=True) + f.seek(0) + ldoc = lxml.etree.parse(f) + self.assertEqual(ldoc.xpath("/iri/displays/display/content")[10].get("id"), self.cont2.iri_id) + self.assertEqual(ldoc.xpath("/iri/medias/media")[9].get("id"), self.cont1.iri_id) + f.close() + + def test_generate_init(self): + self.cont3 = Content(iriurl="id3/iriurl1") + self.cont3.iri_id = "id3" + self.cont3.save() + + self.cont4 = Content(iriurl="id4/iriurl2") + self.cont4.iri_id = "id4" + self.cont4.save() + + self.project.contents.add(self.cont3, self.cont4) + ldoc = self.LU.generate_init(['all', 'foo'], 'ldt.ldt_utils.views.search_ldt') + self.assertEqual(ldoc.xpath("/iri/files/init")[0].tag, "init") + self.assertEqual(ldoc.xpath("/iri/files/library")[0].tag, "library") + + def test_create_ldt(self): + self.cont5 = Content(iriurl="id5/iriurl1") + self.cont5.iri_id = "id5" + self.cont5.save() + + self.cont6 = Content(iriurl="id6/iriurl2") + self.cont6.iri_id = "id6" + self.cont6.save() + + self.project.contents.add(self.cont5, self.cont6) + self.project.ldt = "" + create_ldt(self.project, self.user) + ldt = lxml.etree.fromstring(self.project.ldt_encoded) + self.assertEqual(ldt.xpath("/iri")[0].tag, "iri") + self.assertEqual(ldt.xpath("/iri/project")[0].get("title"), self.project.title) + self.assertEqual(ldt.xpath("/iri/medias/media")[0].get("src"), self.cont5.iri_url()) + self.assertEqual(ldt.xpath("/iri/medias/media")[1].get("id"), self.cont6.iri_id) + + def test_copy_ldt(self): + self.cont7 = Content(iriurl="id7/iriurl1") + self.cont7.iri_id = "id7" + self.cont7.save() + + self.cont8 = Content(iriurl="id8/iriurl2") + self.cont8.iri_id = "id8" + self.cont8.save() + + self.project.contents.add(self.cont7, self.cont8) + copy_ldt(self.project, self.projectcopy, self.user) + ldt1 = lxml.etree.fromstring(self.project.ldt_encoded) + ldt2 = lxml.etree.fromstring(self.projectcopy.ldt_encoded) + self.assertTrue(ldt1.xpath("/iri/project")[0].get("id") != ldt2.xpath("/iri/project")[0].get("id")) + self.assertEqual(ldt1.xpath("/iri/medias/media")[0].get("id"), ldt2.xpath("/iri/medias/media")[0].get("id")) + self.assertEqual(ldt1.xpath("/iri/annotations/content/ensemble")[0].get("title"), ldt2.xpath("/iri/annotations/content/ensemble")[0].get("title")) + self.assertEqual(ldt1.xpath("/iri/annotations/content/ensemble/decoupage")[0].get("id"), ldt2.xpath("/iri/annotations/content/ensemble/decoupage")[0].get("id")) + self.assertEqual(ldt1.xpath("/iri/annotations/content/ensemble/decoupage/title")[1].text, ldt2.xpath("/iri/annotations/content/ensemble/decoupage/title")[1].text.strip("\n\t")) + + def test_create_empty_iri(self): + self.cont9 = Content(iriurl="id9/iriurl1") + self.cont9.iri_id = "id9" + self.cont9.save() + + self.cont10 = Content(iriurl="id10/iriurl2") + self.cont10.iri_id = "id10" + self.cont10.save() + + self.project.contents.add(self.cont9, self.cont10) + tmp = tempfile.TemporaryFile(mode='r+') + create_empty_iri(tmp, self.cont9, "admin") + tmp.seek(0) + ldoc = lxml.etree.parse(tmp) + self.assertEqual(ldoc.xpath("/iri/head/meta")[0].get("content"), self.cont9.iri_id) + self.assertEqual(ldoc.xpath("/iri/body/medias/media/video")[0].get("id"), self.cont9.iri_id) + tmp.close() + + def test_add_annotation(self): + + self.cont11 = Content(iriurl="id11/iriurl1") + self.cont11.iri_id = "id11" + self.cont11.save() + + self.project.contents.add(self.cont11) + self.project.ldt = "" + create_ldt(self.project, self.user) + + self.LA = LdtAnnotation(self.project) + + self.LA.add("id11", "cutting", "title", "text", ["tag1", "tag2"], "800", + "10000", "jdoe", "2011-09-10T09:12:58") + self.LA.save() + ldt = lxml.etree.fromstring(self.project.ldt) + ann = ldt.xpath('/iri/annotations/content[@id="id11"]/ensemble/decoupage/elements/element')[0] + title = ann.xpath('title')[0].text + abstract = ann.xpath('abstract')[0].text + + self.assertEqual(ann.get("author"), "jdoe") + self.assertEqual(title, "title") + self.assertEqual(abstract, "text") + + def test_create_content(self): + self.cont12 = Content(iriurl="id12/iriurl1") + self.cont12.iri_id = "id112" + self.cont12.save() + + """ + def test_create_media(self): + self.media = Media() + + def test_create_project(self): + self.project = Project() + + def test_del_content(self): + self.cont13 = Content(iriurl="id13/iriurl1") + self.cont13.iri_id = "id113" + self.cont13.save() + + self.cont13.delete() + + def test_del_media(self): + self.media2 = Media() + + seld.media2.delete() + + def test_del_project(self): + self.project = Project() + + self.project.delete() + """ + + +class ViewsTest(unittest.TestCase): + def setUp(self): + self.project = Project() + self.project.id = "121" + self.project.save() + self.project.ldt = ' CA: prof et admin <abstract/> <audio source=""/> <tags/> </element> <element id="s_0050F043-3AD2-0A7C-6699-D2A03A1EBA02" begin="5052858" dur="124407" author="" date="2010/09/02" color="10053375" src=""> <title>conseil de classe Reprise de la figure precedente TC: prof et admin Conseil de classe conseil de classe Reprise de la figure precedente Bout a bout 1 ' + + self.cont1 = Content(iriurl="/laurentcantet_entrelesmurs/iriurl1") + self.cont1.iri_id = 'laurentcantet_entrelesmurs' + self.cont1.save() + + self.cont2 = Content(iriurl="/content_notinldt/iriurl2") + self.cont2.iri_id = 'content_notinldt' + self.cont2.save() + + self.project.contents.add(self.cont1, self.cont2) + + def tearDown(self): + self.project.delete() + diff -r 214e220dfab9 -r 9520fb2810e2 src/ldt/ldt/ldt_utils/views/content.py --- a/src/ldt/ldt/ldt_utils/views/content.py Tue Oct 16 17:26:49 2012 +0200 +++ b/src/ldt/ldt/ldt_utils/views/content.py Fri Oct 19 16:08:42 2012 +0200 @@ -47,159 +47,160 @@ current_front_project = instance_content.front_project form_status = 'none' errors_transaction = [] + transaction_succeed = True - # catch error from creating content, project, media - try: - if request.method == "POST": - - if instance_content is not None: - content_instance_val = model_to_dict(instance_content, exclude=ContentForm.Meta.exclude) - else: - content_instance_val = {} - - if instance_media is not None: - media_instance_val = model_to_dict(instance_media, exclude=MediaForm.Meta.exclude) - else: - media_instance_val = {} - #add prefix - - def add_prefix(_dict, prefix): - return dict([('%s-%s' % (prefix, key), value) for key,value in _dict.items()]) - - content_instance_val = add_prefix(content_instance_val, "content") - media_instance_val= add_prefix(media_instance_val, "media") - - for k in request.POST.keys(): - value = request.POST.get(k) - content_instance_val[k] = value - media_instance_val[k] = value - - content_instance_val['read_list'] = request.POST.getlist('read_list') - content_instance_val['write_list'] = request.POST.getlist('write_list') - content_instance_val['share'] = request.POST.get('share', True) - - content_form = ContentForm(content_instance_val, prefix="content", instance=instance_content) - media_form = MediaForm(media_instance_val, request.FILES, prefix="media", instance=instance_media) - picture_form = PictureForm(None, request.POST, request.FILES) - - if request.user.is_staff: - content_form.fields['front_project'].queryset = Project.objects.filter(contents__in=[instance_content]) - - - media_valid = media_form.is_valid() - content_valid = content_form.is_valid() - picture_valid = picture_form.is_valid() - if 'image' in request.POST.keys(): - image_link = request.POST.get('url_image') - if picture_valid and image_link!='' : - try : - r = requests.get(image_link) - img_temp = tempfile.NamedTemporaryFile(suffix='.png') - if img_temp: - img_temp.write(r.content) - img_temp.flush() - picture_form.cleaned_data["image"]=File(img_temp) - except Exception as inst: - logging.debug("couldn't download video thumbnail from image_link : "+str(image_link)) - logging.debug("write_content_base : valid form: for instance : " + repr(instance_media) + " -> media " + str(media_valid) + " content : for instance : " + repr(instance_content) + " : " + str(content_valid) + "picture : valid :" +str(picture_valid)+" loulou") #@UndefinedVariable - - if media_valid and content_valid and picture_valid: - - # see if media must be created - cleaned_data = {} - cleaned_data.update(media_form.cleaned_data) - cleaned_data.pop("media_public") - - media_input_type = content_form.cleaned_data["media_input_type"] + if request.method == "POST": + + if instance_content is not None: + content_instance_val = model_to_dict(instance_content, exclude=ContentForm.Meta.exclude) + else: + content_instance_val = {} + + if instance_media is not None: + media_instance_val = model_to_dict(instance_media, exclude=MediaForm.Meta.exclude) + else: + media_instance_val = {} + #add prefix + + def add_prefix(_dict, prefix): + return dict([('%s-%s' % (prefix, key), value) for key,value in _dict.items()]) + + content_instance_val = add_prefix(content_instance_val, "content") + media_instance_val= add_prefix(media_instance_val, "media") - if media_input_type == "none": - media = None - elif media_input_type == "link": - media = content_form.cleaned_data["media_obj"] - created = False - elif media_input_type == "create": - del cleaned_data["media_file"] - if not cleaned_data['videopath']: - cleaned_data['videopath'] = settings.STREAM_URL - # if the source is already http:// or rtmp:// we don't have to add STREAM_URL - if cleaned_data['src'].startswith("rtmp://") or cleaned_data['src'].startswith("http://"): - cleaned_data['videopath'] = '' + for k in request.POST.keys(): + value = request.POST.get(k) + content_instance_val[k] = value + media_instance_val[k] = value + + content_instance_val['read_list'] = request.POST.getlist('read_list') + content_instance_val['write_list'] = request.POST.getlist('write_list') + content_instance_val['share'] = request.POST.get('share', True) + + content_form = ContentForm(content_instance_val, prefix="content", instance=instance_content) + media_form = MediaForm(media_instance_val, request.FILES, prefix="media", instance=instance_media) + picture_form = PictureForm(None, request.POST, request.FILES) + + if request.user.is_staff: + content_form.fields['front_project'].queryset = Project.objects.filter(contents__in=[instance_content]) + + media_valid = media_form.is_valid() + content_valid = content_form.is_valid() + picture_valid = picture_form.is_valid() + if 'image' in request.POST.keys(): + image_link = request.POST.get('url_image') + if picture_valid and image_link!='' : + try : + r = requests.get(image_link) + img_temp = tempfile.NamedTemporaryFile(suffix='.png') + if img_temp: + img_temp.write(r.content) + img_temp.flush() + picture_form.cleaned_data["image"]=File(img_temp) + except Exception as inst: + logging.debug("couldn't download video thumbnail from image_link : "+str(image_link)) + logging.debug("write_content_base : valid form: for instance : " + repr(instance_media) + " -> media " + str(media_valid) + " content : for instance : " + repr(instance_content) + " : " + str(content_valid) + "picture : valid :" +str(picture_valid)) #@UndefinedVariable + + if media_valid and content_valid and picture_valid: + + # see if media must be created + cleaned_data = {} + cleaned_data.update(media_form.cleaned_data) + cleaned_data.pop("media_public") + + media_input_type = content_form.cleaned_data["media_input_type"] + + if media_input_type == "none": + media = None + elif media_input_type == "link": + media = content_form.cleaned_data["media_obj"] + created = False + elif media_input_type == "create": + del cleaned_data["media_file"] + if not cleaned_data['videopath']: + cleaned_data['videopath'] = settings.STREAM_URL + # if the source is already http:// or rtmp:// we don't have to add STREAM_URL + if cleaned_data['src'].startswith("rtmp://") or cleaned_data['src'].startswith("http://"): + cleaned_data['videopath'] = '' + try: media, created = Media.objects.get_or_create(src=cleaned_data['src'], defaults=cleaned_data) #@UndefinedVariable - elif media_input_type == "url" or media_input_type == "upload" : - # copy file - #complet src - destination_file = None - source_file = None - try: - if media_input_type == "url": - url = cleaned_data["external_src_url"] - source_file = urllib2.urlopen(url) - source_filename = source_file.info().get('Content-Disposition', None) - if not source_filename: - source_filename = urlparse.urlparse(url).path.rstrip("/").split('/')[-1] - elif media_input_type == "upload": - #source_file = request.FILES['media-media_file'] - # At this point the file has already be uploaded thanks to the upload view, and original file name is sent through a post var - source_filename = request.POST["media-local_file_name"] - - source_filename = ldt_utils_path.sanitize_filename(source_filename) - destination_filepath = os.path.join(settings.STREAM_PATH, source_filename) - base_source_filename = source_filename - extension = base_source_filename.split(".")[-1] - if extension == base_source_filename: - extension = "" - base_basename_filename = base_source_filename - else: - base_basename_filename = base_source_filename[:-1 * (len(extension) + 1)] - i = 0 + except: + transaction_succeed = False + elif media_input_type == "url" or media_input_type == "upload" : + # copy file + #complet src + destination_file = None + source_file = None + try: + if media_input_type == "url": + url = cleaned_data["external_src_url"] + source_file = urllib2.urlopen(url) + source_filename = source_file.info().get('Content-Disposition', None) + if not source_filename: + source_filename = urlparse.urlparse(url).path.rstrip("/").split('/')[-1] + elif media_input_type == "upload": + #source_file = request.FILES['media-media_file'] + # At this point the file has already be uploaded thanks to the upload view, and original file name is sent through a post var + source_filename = request.POST["media-local_file_name"] + + source_filename = ldt_utils_path.sanitize_filename(source_filename) + destination_filepath = os.path.join(settings.STREAM_PATH, source_filename) + base_source_filename = source_filename + extension = base_source_filename.split(".")[-1] + if extension == base_source_filename: + extension = "" + base_basename_filename = base_source_filename + else: + base_basename_filename = base_source_filename[:-1 * (len(extension) + 1)] + i = 0 + + while os.path.exists(destination_filepath): + base_source_filename = "%s.%d.%s" % (base_basename_filename, i, extension) + destination_filepath = os.path.join(settings.STREAM_PATH, base_source_filename) + i += 1 + + if media_input_type == "url": + # we upload the file if we are in url case + destination_file = open(destination_filepath, "wb") + chunck = source_file.read(2048) + while chunck: + destination_file.write(chunck) + chunck = source_file.read(2048) - while os.path.exists(destination_filepath): - base_source_filename = "%s.%d.%s" % (base_basename_filename, i, extension) - destination_filepath = os.path.join(settings.STREAM_PATH, base_source_filename) - i += 1 - - if media_input_type == "url": - # we upload the file if we are in url case - destination_file = open(destination_filepath, "wb") - chunck = source_file.read(2048) - while chunck: - destination_file.write(chunck) - chunck = source_file.read(2048) - - elif media_input_type == "upload": - # The media file has been uploaded in the session temp folder - # so we just have to move to the regular folder and rename it. - if os.path.exists(os.path.join(settings.STREAM_PATH, "tmp/" + request.COOKIES[settings.SESSION_COOKIE_NAME] + "/", source_filename)): - os.rename(os.path.join(settings.STREAM_PATH, "tmp/" + request.COOKIES[settings.SESSION_COOKIE_NAME] + "/", source_filename), os.path.join(settings.STREAM_PATH, base_source_filename)) - - - src_prefix = settings.STREAM_SRC_PREFIX.rstrip("/") - if len(src_prefix) > 0: - cleaned_data["src"] = src_prefix + "/" + base_source_filename - else: - cleaned_data["src"] = base_source_filename - - - except Exception as inst: - logging.debug("write_content_base : POST error when processing file:" + str(inst)) #@UndefinedVariable - form_status = "error" - #set error for form - if media_input_type == "url": - errors = media_form._errors.setdefault("external_src_url", ErrorList()) - errors.append(_("Problem when downloading file from url : ") + url) - elif media_input_type == "upload": - errors = media_form._errors.setdefault("media_file", ErrorList()) - errors.append(_("Problem when uploading file : ") + str(inst)) - finally: - if media_input_type == "url": - if destination_file: - destination_file.close() - if source_file: - source_file.close() + elif media_input_type == "upload": + # The media file has been uploaded in the session temp folder + # so we just have to move to the regular folder and rename it. + if os.path.exists(os.path.join(settings.STREAM_PATH, "tmp/" + request.COOKIES[settings.SESSION_COOKIE_NAME] + "/", source_filename)): + os.rename(os.path.join(settings.STREAM_PATH, "tmp/" + request.COOKIES[settings.SESSION_COOKIE_NAME] + "/", source_filename), os.path.join(settings.STREAM_PATH, base_source_filename)) - if form_status != "error": - #try: + src_prefix = settings.STREAM_SRC_PREFIX.rstrip("/") + if len(src_prefix) > 0: + cleaned_data["src"] = src_prefix + "/" + base_source_filename + else: + cleaned_data["src"] = base_source_filename + + + except Exception as inst: + logging.debug("write_content_base : POST error when processing file:" + str(inst)) #@UndefinedVariable + form_status = "error" + #set error for form + if media_input_type == "url": + errors = media_form._errors.setdefault("external_src_url", ErrorList()) + errors.append(_("Problem when downloading file from url : ") + url) + elif media_input_type == "upload": + errors = media_form._errors.setdefault("media_file", ErrorList()) + errors.append(_("Problem when uploading file : ") + str(inst)) + finally: + if media_input_type == "url": + if destination_file: + destination_file.close() + if source_file: + source_file.close() + + + if form_status != "error": + try: del cleaned_data["media_file"] if not cleaned_data['videopath']: cleaned_data['videopath'] = settings.STREAM_URL @@ -209,59 +210,65 @@ cleaned_data['mimetype_field'] = mimetype media, created = Media.safe_objects.get_or_create(src=cleaned_data['src'], defaults=cleaned_data) #@UndefinedVariable cached_assign('view_media', request.user, media) - else: - media = None - - - if media and not created: - for attribute in ('external_id', 'external_permalink', 'external_publication_url', 'external_src_url', 'media_creation_date', 'videopath', 'duration', 'description', 'title', 'front_project'): - setattr(media, attribute, cleaned_data.get(attribute)) - mimetype = cleaned_data.get('mimetype_field', None) - if not mimetype: - mimetype = mimetypes.guess_type(media.src) - media.mimetype_field = mimetype - cached_assign('view_media', request.user, media) - cached_assign('change_media', request.user, media) - media.save() + except: + transaction_succeed = False + else: + media = None + + + if media and not created: + for attribute in ('external_id', 'external_permalink', 'external_publication_url', 'external_src_url', 'media_creation_date', 'videopath', 'duration', 'description', 'title', 'front_project'): + setattr(media, attribute, cleaned_data.get(attribute)) + mimetype = cleaned_data.get('mimetype_field', None) + if not mimetype: + mimetype = mimetypes.guess_type(media.src) + media.mimetype_field = mimetype + cached_assign('view_media', request.user, media) + cached_assign('change_media', request.user, media) + media.save() + + if form_status != "error": + content_defaults = {} + content_defaults.update(content_form.cleaned_data) + content_defaults['media_obj'] = media + + for key in ["media_input_type", "groups", "is_public", "read_list", "write_list", "share" ]: + del content_defaults[key] - if form_status != "error": - content_defaults = {} - content_defaults.update(content_form.cleaned_data) - content_defaults['media_obj'] = media - - for key in ["media_input_type", "groups", "is_public", "read_list", "write_list", "share" ]: - del content_defaults[key] - + try: content, created = Content.safe_objects.get_or_create(iri_id=content_form.cleaned_data['iri_id'], defaults=content_defaults) #@UndefinedVariable + except: + transaction_succeed = False - if not created and not request.user.has_perm('ldt_utils.change_content', content): - raise AttributeError("%s is not allowed to change content %s" % (request.user, content)) - - cached_assign('change_content', request.user, content) - cached_assign('view_content', request.user, content) - everyone = Group.objects.get(name=settings.PUBLIC_GROUP_NAME) - - if media_form.cleaned_data['media_public']: + if not created and not request.user.has_perm('ldt_utils.change_content', content): + raise AttributeError("%s is not allowed to change content %s" % (request.user, content)) + + cached_assign('change_content', request.user, content) + cached_assign('view_content', request.user, content) + everyone = Group.objects.get(name=settings.PUBLIC_GROUP_NAME) + + if media_form.cleaned_data['media_public']: + cached_assign('view_content', everyone, content) + if media: + cached_assign('view_media', everyone, media) + else: + remove_perm('ldt_utils.view_media', everyone, media) + assign_perm_to_obj(content, content_form.cleaned_data['read_list'], content_form.cleaned_data['write_list'], request.user) + if media: + assign_perm_to_obj(media, content_form.cleaned_data['read_list'], content_form.cleaned_data['write_list'], request.user) + if content_form.cleaned_data['is_public']: cached_assign('view_content', everyone, content) - if media: - cached_assign('view_media', everyone, media) else: - remove_perm('ldt_utils.view_media', everyone, media) - if media: - assign_perm_to_obj(media, content_form.cleaned_data['read_list'], content_form.cleaned_data['write_list'], request.user) - assign_perm_to_obj(content, content_form.cleaned_data['read_list'], content_form.cleaned_data['write_list'], request.user) - if content_form.cleaned_data['is_public']: - cached_assign('view_content', everyone, content) - else: - remove_perm('ldt_utils.view_content', everyone, content) - - if not created: - for attribute in ('iriurl', 'title', 'description', 'duration', 'content_creation_date', 'tags', 'media_obj'): - setattr(content, attribute, content_defaults[attribute]) - - if request.user.is_staff and content_defaults.has_key('front_project'): - content.front_project = content_defaults['front_project'] + remove_perm('ldt_utils.view_content', everyone, content) + + if not created: + for attribute in ('iriurl', 'title', 'description', 'duration', 'content_creation_date', 'tags', 'media_obj'): + setattr(content, attribute, content_defaults[attribute]) + if request.user.is_staff and content_defaults.has_key('front_project'): + content.front_project = content_defaults['front_project'] + + try: content.save() picture_form.model = content picture_form.save() @@ -269,34 +276,33 @@ media_form = MediaForm(instance=media, prefix="media") content_form = ContentForm(instance=content, prefix="content") picture_form = PictureForm() - else: - form_status = 'error' + except: + transaction_succeed = False + else: + form_status = 'error' + else: + form_status = 'empty' + initial_c = { 'media_input_type':"link"} + initial_m = {} + if instance_media: + initial_m['media_public'] = instance_media.is_public else: - form_status = 'empty' - initial_c = { 'media_input_type':"link"} - initial_m = {} - if instance_media: - initial_m['media_public'] = instance_media.is_public - else: - initial_m['media_public'] = True - if instance_content: - initial_c['is_public'] = instance_content.is_public - else: - initial_c['is_public'] = True - content_form = ContentForm(prefix="content", instance=instance_content, initial=initial_c) - media_form = MediaForm(prefix="media", instance=instance_media, initial=initial_m) - picture_form = PictureForm() - - if instance_content is not None: - content_form.media_input_type = "link" - - if request.user.is_staff: - content_form.fields['front_project'].queryset = Project.objects.filter(contents__in=[instance_content]) - except: - transaction.rollback() - errors_transaction.append(_("Content creation failure")) - return False, False, False, False, False, errors_transaction - else: + initial_m['media_public'] = True + if instance_content: + initial_c['is_public'] = instance_content.is_public + else: + initial_c['is_public'] = True + content_form = ContentForm(prefix="content", instance=instance_content, initial=initial_c) + media_form = MediaForm(prefix="media", instance=instance_media, initial=initial_m) + picture_form = PictureForm() + + if instance_content is not None: + content_form.media_input_type = "link" + + if request.user.is_staff: + content_form.fields['front_project'].queryset = Project.objects.filter(contents__in=[instance_content]) + + if transaction_succeed: #Try to make sure the commit succeeded or not. try: transaction.commit() @@ -304,6 +310,11 @@ transaction.rollback() errors_transaction.append(_("Commit of the content creation failed")) return False, False, False, False, False, errors_transaction + else: + transaction.rollback() + errors_transaction.append(_("Content creation failure")) + return False, False, False, False, False, errors_transaction + return content_form, media_form, picture_form, form_status, current_front_project, errors_transaction @login_required