# HG changeset patch # User rougeronj # Date 1352732415 -3600 # Node ID d9dd9dbd22cac40978aea4654f87c50af0c22026 # Parent 61da23337d76fa181c0c0e2e071ad9ede6a0a983 add some unit test. Update content organization diff -r 61da23337d76 -r d9dd9dbd22ca src/ldt/ldt/ldt_utils/tests/content_tests.py --- a/src/ldt/ldt/ldt_utils/tests/content_tests.py Fri Nov 16 18:37:23 2012 +0100 +++ b/src/ldt/ldt/ldt_utils/tests/content_tests.py Mon Nov 12 16:00:15 2012 +0100 @@ -16,6 +16,7 @@ import unittest import uuid import logging +import os class ContentTest(TestCase): @@ -46,8 +47,8 @@ self.cont13.iri_id = "id113" self.cont13.save() - self.assertEqual(Content.objects.get(iri_id=self.cont13.iri_id), self.cont13) - + self.assertEqual(Content.objects.get(iri_id=self.cont13.iri_id), self.cont13) + #test the deletion of a content without media def test_del_content_v1(self): self.cont14 = Content(iriurl="id14/iriurl14", duration = 100) diff -r 61da23337d76 -r d9dd9dbd22ca src/ldt/ldt/ldt_utils/tests/ldt_tests.py --- a/src/ldt/ldt/ldt_utils/tests/ldt_tests.py Fri Nov 16 18:37:23 2012 +0100 +++ b/src/ldt/ldt/ldt_utils/tests/ldt_tests.py Mon Nov 12 16:00:15 2012 +0100 @@ -68,7 +68,6 @@ self.assertEqual(ldoc.xpath("/iri/medias/media")[1].get("id"), self.cont2.iri_id) f.close() - """ def test_generate_init(self): self.cont3 = Content(iriurl="id3/iriurl3", duration=111) self.cont3.iri_id = "id3" @@ -82,7 +81,6 @@ ldoc = self.LU.generate_init([], 'ldt.ldt_utils.views.search_ldt') self.assertEqual(ldoc.xpath("/iri/files/init")[0].tag, "init") self.assertEqual(ldoc.xpath("/iri/files/library")[0].tag, "library") - """ def test_create_ldt(self): self.cont5 = Content(iriurl="id5/iriurl5", duration=111) diff -r 61da23337d76 -r d9dd9dbd22ca src/ldt/ldt/ldt_utils/tests/project_tests.py --- a/src/ldt/ldt/ldt_utils/tests/project_tests.py Fri Nov 16 18:37:23 2012 +0100 +++ b/src/ldt/ldt/ldt_utils/tests/project_tests.py Mon Nov 12 16:00:15 2012 +0100 @@ -52,4 +52,26 @@ with self.assertRaises(Project.DoesNotExist): Project.objects.get(ldt_id=self.project3.ldt_id) - \ No newline at end of file + + #test deletion of project with annotations + def test_del_project_v2(self): + + self.project4 = Project(title="titleproj3", owner=self.user) + self.project4.ldt = ' CA: prof et admin <abstract/> <audio source=""/> <tags/> </element> <element id="s_0050F043-3AD2-0A7C-6699-D2A03A1EBA02" begin="5052858" dur="124407" author="" date="2010/09/02" color="10053375" src=""> <title>conseil de classe Reprise de la figure precedente TC: prof et admin Conseil de classe conseil de classe Reprise de la figure precedente Bout a bout 1 ' + self.project4.id = "444" + self.project4.ldt_id = str(uuid.uuid1()) + self.project4.description = "proj4description" + self.project4.save() + + create_ldt(self.project4, self.user) + + self.LA = LdtAnnotation(self.project4) + + self.LA.add("id11", "cutting_id", "cutting_title", "title", "text", ["tag1", "tag2"], "800", + "10000", "jdoe", "2011-09-10T09:12:58") + self.LA.save() + + self.project4.delete() + + with self.assertRaises(Project.DoesNotExist): + Project.objects.get(ldt_id=self.project4.ldt_id) \ No newline at end of file diff -r 61da23337d76 -r d9dd9dbd22ca src/ldt/ldt/ldt_utils/views/content.py --- a/src/ldt/ldt/ldt_utils/views/content.py Fri Nov 16 18:37:23 2012 +0100 +++ b/src/ldt/ldt/ldt_utils/views/content.py Mon Nov 12 16:00:15 2012 +0100 @@ -34,123 +34,6 @@ import urlparse #from django.core.files.temp import NamedTemporaryFile -def media_management(request, media_input_type, cleaned_data, content_form, media_form, form_status): - if media_input_type == "none": - media = None - elif media_input_type == "link": - media = content_form.cleaned_data["media_obj"] - created = False - elif media_input_type == "create": - del cleaned_data["media_file"] - if not cleaned_data['videopath']: - cleaned_data['videopath'] = settings.STREAM_URL - # if the source is already http:// or rtmp:// we don't have to add STREAM_URL - if cleaned_data['src'].startswith("rtmp://") or cleaned_data['src'].startswith("http://"): - cleaned_data['videopath'] = '' - - media, created = Media.objects.get_or_create(src=cleaned_data['src'], defaults=cleaned_data) #@UndefinedVariable - - elif media_input_type == "url" or media_input_type == "upload" : - # copy file - #complet src - destination_file = None - source_file = None - try: - if media_input_type == "url": - url = cleaned_data["external_src_url"] - source_file = urllib2.urlopen(url) - source_filename = source_file.info().get('Content-Disposition', None) - if not source_filename: - source_filename = urlparse.urlparse(url).path.rstrip("/").split('/')[-1] - elif media_input_type == "upload": - #source_file = request.FILES['media-media_file'] - # At this point the file has already be uploaded thanks to the upload view, and original file name is sent through a post var - source_filename = request.POST["media-local_file_name"] - - source_filename = ldt_utils_path.sanitize_filename(source_filename) - destination_filepath = os.path.join(settings.STREAM_PATH, source_filename) - base_source_filename = source_filename - extension = base_source_filename.split(".")[-1] - if extension == base_source_filename: - extension = "" - base_basename_filename = base_source_filename - else: - base_basename_filename = base_source_filename[:-1 * (len(extension) + 1)] - i = 0 - - while os.path.exists(destination_filepath): - base_source_filename = "%s.%d.%s" % (base_basename_filename, i, extension) - destination_filepath = os.path.join(settings.STREAM_PATH, base_source_filename) - i += 1 - - if media_input_type == "url": - # we upload the file if we are in url case - destination_file = open(destination_filepath, "wb") - chunck = source_file.read(2048) - while chunck: - destination_file.write(chunck) - chunck = source_file.read(2048) - - elif media_input_type == "upload": - # The media file has been uploaded in the session temp folder - # so we just have to move to the regular folder and rename it. - if os.path.exists(os.path.join(settings.STREAM_PATH, "tmp/" + request.COOKIES[settings.SESSION_COOKIE_NAME] + "/", source_filename)): - os.rename(os.path.join(settings.STREAM_PATH, "tmp/" + request.COOKIES[settings.SESSION_COOKIE_NAME] + "/", source_filename), os.path.join(settings.STREAM_PATH, base_source_filename)) - - - src_prefix = settings.STREAM_SRC_PREFIX.rstrip("/") - if len(src_prefix) > 0: - cleaned_data["src"] = src_prefix + "/" + base_source_filename - else: - cleaned_data["src"] = base_source_filename - - - except Exception as inst: - logging.debug("write_content_base : POST error when processing file:" + str(inst)) #@UndefinedVariable - form_status = "error" - #set error for form - if media_input_type == "url": - errors = media_form._errors.setdefault("external_src_url", ErrorList()) - errors.append(_("Problem when downloading file from url : ") + url) - elif media_input_type == "upload": - errors = media_form._errors.setdefault("media_file", ErrorList()) - errors.append(_("Problem when uploading file : ") + str(inst)) - finally: - if media_input_type == "url": - if destination_file: - destination_file.close() - if source_file: - source_file.close() - - - if form_status != "error": - del cleaned_data["media_file"] - if not cleaned_data['videopath']: - cleaned_data['videopath'] = settings.STREAM_URL - mimetype = cleaned_data.get('mimetype_field', None) - if not mimetype: - mimetype = mimetypes.guess_type(cleaned_data['src']) - cleaned_data['mimetype_field'] = mimetype - media, created = Media.safe_objects.get_or_create(src=cleaned_data['src'], defaults=cleaned_data) #@UndefinedVariable - cached_assign('view_media', request.user, media) - else: - media = None - - - if media and not created: - for attribute in ('external_id', 'external_permalink', 'external_publication_url', 'external_src_url', 'media_creation_date', 'videopath', 'duration', 'description', 'title', 'front_project'): - setattr(media, attribute, cleaned_data.get(attribute)) - mimetype = cleaned_data.get('mimetype_field', None) - if not mimetype: - mimetype = mimetypes.guess_type(media.src) - media.mimetype_field = mimetype - cached_assign('view_media', request.user, media) - cached_assign('change_media', request.user, media) - media.save() - - return media, form_status - - @transaction.commit_manually def write_content_base(request, iri_id=None): if iri_id: @@ -229,14 +112,8 @@ media_input_type = content_form.cleaned_data["media_input_type"] - - - media, form_status = media_management(request, media_input_type, cleaned_data, content_form, media_form, form_status) - - - if form_status != "error": content_defaults = {} content_defaults.update(content_form.cleaned_data) @@ -380,7 +257,7 @@ member_list, admin_list = get_userlist_model(content_temp, request.user) # Deleted is False if an error occurred during deletion - if (deleted == False) or (content_form == False and media_form == False and picture_form == False and form_status == False and current_front_project == False): + if (content_form == False and media_form == False and picture_form == False and form_status == False and current_front_project == False): message=_("An error occurred - Please try again or contact webmaster") title = _("Error") logging.error(e) @@ -410,6 +287,126 @@ 'elem_list':elem_list, 'member_list': member_list, 'admin_list': admin_list, 'iri_id': iri_id, 'session_key':session_key, 'cookie_name':cookie_name, 'img_container': img_container, 'profile_picture_form': picture_form, 'current_front_project':current_front_project}, context_instance=RequestContext(request)) + + +def media_management(request, media_input_type, cleaned_data, content_form, media_form, form_status): + if media_input_type == "none": + media = None + elif media_input_type == "link": + media = content_form.cleaned_data["media_obj"] + created = False + elif media_input_type == "create": + del cleaned_data["media_file"] + if not cleaned_data['videopath']: + cleaned_data['videopath'] = settings.STREAM_URL + # if the source is already http:// or rtmp:// we don't have to add STREAM_URL + if cleaned_data['src'].startswith("rtmp://") or cleaned_data['src'].startswith("http://"): + cleaned_data['videopath'] = '' + + media, created = Media.objects.get_or_create(src=cleaned_data['src'], defaults=cleaned_data) #@UndefinedVariable + + elif media_input_type == "url" or media_input_type == "upload" : + # copy file + #complet src + destination_file = None + source_file = None + try: + if media_input_type == "url": + url = cleaned_data["external_src_url"] + source_file = urllib2.urlopen(url) + source_filename = source_file.info().get('Content-Disposition', None) + if not source_filename: + source_filename = urlparse.urlparse(url).path.rstrip("/").split('/')[-1] + elif media_input_type == "upload": + #source_file = request.FILES['media-media_file'] + # At this point the file has already be uploaded thanks to the upload view, and original file name is sent through a post var + source_filename = request.POST["media-local_file_name"] + + source_filename = ldt_utils_path.sanitize_filename(source_filename) + destination_filepath = os.path.join(settings.STREAM_PATH, source_filename) + base_source_filename = source_filename + extension = base_source_filename.split(".")[-1] + if extension == base_source_filename: + extension = "" + base_basename_filename = base_source_filename + else: + base_basename_filename = base_source_filename[:-1 * (len(extension) + 1)] + i = 0 + + while os.path.exists(destination_filepath): + base_source_filename = "%s.%d.%s" % (base_basename_filename, i, extension) + destination_filepath = os.path.join(settings.STREAM_PATH, base_source_filename) + i += 1 + + if media_input_type == "url": + # we upload the file if we are in url case + destination_file = open(destination_filepath, "wb") + chunck = source_file.read(2048) + while chunck: + destination_file.write(chunck) + chunck = source_file.read(2048) + + elif media_input_type == "upload": + # The media file has been uploaded in the session temp folder + # so we just have to move to the regular folder and rename it. + if os.path.exists(os.path.join(settings.STREAM_PATH, "tmp/" + request.COOKIES[settings.SESSION_COOKIE_NAME] + "/", source_filename)): + os.rename(os.path.join(settings.STREAM_PATH, "tmp/" + request.COOKIES[settings.SESSION_COOKIE_NAME] + "/", source_filename), os.path.join(settings.STREAM_PATH, base_source_filename)) + + + src_prefix = settings.STREAM_SRC_PREFIX.rstrip("/") + if len(src_prefix) > 0: + cleaned_data["src"] = src_prefix + "/" + base_source_filename + else: + cleaned_data["src"] = base_source_filename + + + except Exception as inst: + logging.debug("write_content_base : POST error when processing file:" + str(inst)) #@UndefinedVariable + form_status = "error" + #set error for form + if media_input_type == "url": + errors = media_form._errors.setdefault("external_src_url", ErrorList()) + errors.append(_("Problem when downloading file from url : ") + url) + elif media_input_type == "upload": + errors = media_form._errors.setdefault("media_file", ErrorList()) + errors.append(_("Problem when uploading file : ") + str(inst)) + finally: + if media_input_type == "url": + if destination_file: + destination_file.close() + if source_file: + source_file.close() + + + if form_status != "error": + del cleaned_data["media_file"] + if not cleaned_data['videopath']: + cleaned_data['videopath'] = settings.STREAM_URL + mimetype = cleaned_data.get('mimetype_field', None) + if not mimetype: + mimetype = mimetypes.guess_type(cleaned_data['src']) + cleaned_data['mimetype_field'] = mimetype + media, created = Media.safe_objects.get_or_create(src=cleaned_data['src'], defaults=cleaned_data) #@UndefinedVariable + cached_assign('view_media', request.user, media) + else: + media = None + + + if media and not created: + for attribute in ('external_id', 'external_permalink', 'external_publication_url', 'external_src_url', 'media_creation_date', 'videopath', 'duration', 'description', 'title', 'front_project'): + setattr(media, attribute, cleaned_data.get(attribute)) + mimetype = cleaned_data.get('mimetype_field', None) + if not mimetype: + mimetype = mimetypes.guess_type(media.src) + media.mimetype_field = mimetype + cached_assign('view_media', request.user, media) + cached_assign('change_media', request.user, media) + media.save() + + return media, form_status + + + @login_required def prepare_delete_content(request, iri_id=None): errors = []