creation of a function in order to avoid using settings.WEB_URL + reverse in the views. Function to get the web_url using site domain
replacement in the templates of WEB_URL + templatetag url by the templatetag absurl
from django.conf import settings
from django.contrib.auth.models import User, Group
from django.db import models
from django.utils.translation import ugettext_lazy as _
from guardian.shortcuts import assign, remove_perm, get_perms
from ldt.core.models import Document
from ldt.security import (get_current_user_or_admin, set_current_user,
get_current_user)
from ldt.security.manager import SafeManager
from ldt.security.models import SafeModel
from ldt.utils import url as url_utils
from sorl.thumbnail import ImageField
from tagging.models import Tag
from utils import (create_ldt, copy_ldt, create_empty_iri, update_iri,
generate_uuid)
from ldt.utils import generate_hash
from ldt.utils.url import get_web_url
import datetime
import lxml.etree #@UnresolvedImport
import mimetypes
import os.path
import re
import tagging.fields
import uuid
from shutil import move
from django.core.files.storage import default_storage
#from ldt.core.models import Document, Owner
class Author(SafeModel):
handle = models.CharField(max_length=255, unique=True, blank=True, null=True)
email = models.EmailField(unique=False, blank=True, null=True)
firstname = models.CharField(max_length=512, blank=True, null=True)
lastname = models.CharField(max_length=512, blank=True, null=True)
def __unicode__(self):
return unicode(self.id) + " - " + self.handle + ", " + self.email + ", " + self.firstname + " " + self.lastname
class Meta:
permissions = (
('view_author', 'Can view author'),
)
class Media(SafeModel):
external_id = models.CharField(max_length=1024, null=True, blank=True, verbose_name=_('media.external_id'))
external_permalink = models.URLField(max_length=1024, verify_exists=False, null=True, blank=True, verbose_name=_('media.external_permalink'))
external_publication_url = models.URLField(max_length=1024, verify_exists=True, null=True, blank=True, verbose_name=_('media.external_publication_url'))
external_src_url = models.CharField(max_length=1024, null=True, blank=True, verbose_name=_('media.external_src_url'))
creation_date = models.DateTimeField(auto_now_add=True, verbose_name=_('media.creation_date'))
media_creation_date = models.DateTimeField(null=True, blank=True, verbose_name=_('media.media_creation_date'))
update_date = models.DateTimeField(auto_now=True, verbose_name=_('media.update_date'))
videopath = models.CharField(max_length=1024, null=True, blank=True, verbose_name=_('media.videopath'))
duration = models.IntegerField(null=True, blank=True, verbose_name=_('media.duration'))
creator = models.ForeignKey(User, blank=True, null=True, verbose_name=_('media.creator'))
description = models.TextField(null=True, blank=True, verbose_name=_('description'))
title = models.CharField(max_length=1024, null=True, blank=True, verbose_name=_('title'))
src = models.CharField(max_length=1024, verbose_name=_('media.src'))
src_hash = models.CharField(max_length=128, unique=True, verbose_name=_('media.src_hash'), blank=True)
mimetype_field = models.CharField(max_length=512, null=True, blank=True, verbose_name=_('media.mimetype'))
class Meta:
permissions = (
('view_media', 'Can view media'),
)
def mimetype(): #@NoSelf
def fget(self):
if self.mimetype_field :
return self.mimetype_field
elif self.src:
return mimetypes.guess_type(self.src.rstrip())[0]
else:
return None
def fset(self, value):
self.mimetype_field = value
return locals()
mimetype = property(**mimetype())
def stream_src(): #@NoSelf
def fget(self):
res_src = self.src.rstrip()
if self.videopath and self.videopath.startswith("rtmp://") and "mp3:" not in res_src and "mp4:" not in res_src:
extension = res_src.split(".")[-1]
res_src = {
'flv': lambda s: s,
'mp3': lambda s: "%s:%s" % ("mp3", res_src[:-4]),
'mp4': lambda s: "%s:%s" % ("mp4", res_src),
'f4v': lambda s: "%s:%s" % ("mp4", res_src),
'mov': lambda s: "%s:%s" % ("mp4", res_src),
}.get(extension, lambda s:s)(res_src)
return res_src
return locals()
stream_src = property(**stream_src())
def is_public(): #@NoSelf
def fget(self):
if self.pk:
everyone = Group.objects.get(name=settings.PUBLIC_GROUP_NAME)
if 'view_media' in get_perms(everyone, self):
return True
return False
def fset(self, value):
if self.pk:
everyone = Group.objects.get(name=settings.PUBLIC_GROUP_NAME)
if value:
assign('ldt_utils.view_media', everyone, self.media_obj)
else:
remove_perm('ldt_utils.view_media', everyone, self.media_obj)
return locals()
is_public = property(**is_public())
def save(self, *args, **kwargs):
self.src_hash = generate_hash(self.src)
super(Media, self).save(*args, **kwargs)
for content in self.content_set.all():
content.sync_iri_file()
def __unicode__(self):
strings = []
if self.title:
strings.append(unicode(self.title))
else:
strings.append(unicode(self.src))
if self.external_id:
strings.append(unicode(self.external_id))
return "|".join(strings)
class ContentManager(SafeManager):
def __init__(self):
super(ContentManager, self).__init__(check_perm=False)
def get_by_natural_key(self, iri_id):
return self.get(iri_id=iri_id)
class Content(SafeModel):
objects = ContentManager()
iri_id = models.CharField(max_length=255, unique=True, default=generate_uuid, verbose_name=_('content.iri_id'))
iriurl = models.CharField(max_length=1024, verbose_name=_('content.iriurl'))
creation_date = models.DateTimeField(auto_now_add=True, verbose_name=_('content.creation_date'))
update_date = models.DateTimeField(auto_now=True, verbose_name=_('content.update_date'))
title = models.CharField(max_length=1024, null=True, blank=True, verbose_name=_('content.title'))
description = models.TextField(null=True, blank=True, verbose_name=_('content.description'))
authors = models.ManyToManyField(Author, blank=True, verbose_name=_('content.authors'))
duration = models.IntegerField(null=True, blank=True, verbose_name=_('content.duration'))
content_creation_date = models.DateTimeField(null=True, blank=True, verbose_name=_('content.content_creation_date'))
tags = tagging.fields.TagField(max_length=2048, null=True, blank=True)
media_obj = models.ForeignKey('Media', blank=True, null=True)
image = ImageField(upload_to="thumbnails/contents/", default=settings.DEFAULT_CONTENT_ICON, max_length=200)
front_project = models.ForeignKey('Project', null=True, blank=True)
class Meta:
ordering = ["title"]
permissions = (
('view_content', 'Can view content'),
)
def __init__(self, *args, **kwargs):
super(Content, self).__init__(*args, **kwargs)
if not hasattr(Content, 'pol_positive'):
self.__add_polemic_attributes()
def delete(self):
super(Content, self).delete()
iri_file_path = self.iri_file_path()
thumbnail = os.path.join(settings.MEDIA_ROOT, unicode(self.image))
if os.path.exists(iri_file_path):
iri_dir = os.path.dirname(iri_file_path)
temp = os.path.join(os.path.join(os.path.dirname(iri_dir), "temp"), self.iri_id)
try:
move(iri_dir, temp)
except Exception, e:
raise e
if os.path.exists(thumbnail):
if os.path.basename(thumbnail) != os.path.basename(settings.DEFAULT_CONTENT_ICON):
temp_thumbnail = os.path.join(os.path.dirname(thumbnail), "temp")
try:
if not os.path.exists(temp_thumbnail):
os.makedirs(temp_thumbnail)
move(thumbnail, os.path.join(temp_thumbnail, os.path.basename(thumbnail)))
except Exception, e:
raise e
#del .iri, and .png from temp directory
def commit(self):
iri_file_path=self.iri_file_path()
iri_dir = os.path.dirname(iri_file_path)
temp = os.path.join(os.path.join(os.path.dirname(iri_dir), "temp"), self.iri_id)
thumbnail = os.path.join(settings.MEDIA_ROOT, unicode(self.image))
temp_thumbnail = os.path.join(os.path.dirname(thumbnail), "temp")
if os.path.exists(temp):
default_storage.delete(os.path.join(temp, os.path.basename(self.iriurl)))
os.rmdir(temp)
if os.path.exists(temp_thumbnail):
default_storage.delete(os.path.join(temp_thumbnail, os.path.basename(thumbnail)))
os.rmdir(temp_thumbnail)
#move .iri, and .png to there original directory
def rollback(self):
iri_file_path=self.iri_file_path()
iri_dir = os.path.dirname(iri_file_path)
temp = os.path.join(os.path.join(os.path.dirname(iri_dir), "temp"), self.iri_id)
thumbnail = os.path.join(settings.MEDIA_ROOT, unicode(self.image))
temp_thumbnail = os.path.join(os.path.dirname(thumbnail), "temp")
if os.path.exists(temp):
move(temp, iri_dir)
os.rmdir(os.path.dirname(temp))
if os.path.exists(temp_thumbnail):
move(os.path.join(temp_thumbnail, os.path.basename(thumbnail)), os.path.dirname(thumbnail))
os.rmdir(temp_thumbnail)
def natural_key(self):
return self.iri_id
# added for import
def get_by_natural_key(self, iri_id):
return self.get(iri_id=iri_id)
def get_duration(self):
if self.duration is None:
doc = lxml.etree.parse(self.iri_file_path())
res = doc.xpath("/iri/body/medias/media[@id='video']/video")
if len(res) > 0:
try:
self.duration = int(res[0].get(u'dur', 0) or 0)
except:
self.duration = 0
else:
self.duration = 0
self.save()
return self.duration
def mimetype(): #@NoSelf
def fget(self):
if self.media_obj:
return self.media_obj.mimetype
else:
return None
return locals()
mimetype = property(**mimetype())
def sync_iri_file(self):
# create iri file if needed
created = False
try:
iri_file_path = self.iri_file_path()
if not os.path.exists(iri_file_path):
iri_dir = os.path.dirname(iri_file_path)
if not os.path.exists(iri_dir):
os.makedirs(iri_dir)
created = True
iri_file = open(iri_file_path, "w")
create_empty_iri(iri_file, self, "IRI")
else:
created = False
update_iri(iri_file_path, self, "IRI")
except Exception, e:
if created:
if os.path.exists(iri_file_path):
os.remove(iri_file_path)
raise e
#TODO: better manage the change in .iri name and error scenario (save in temp file + rename
def save(self, *args, **kwargs):
create_front_project = False
# update it
self.sync_iri_file()
if not self.pk:
create_front_project = True
super(Content, self).save(*args, **kwargs)
if create_front_project:
# We need a primary key for self in create_project, so
# save() has to be called first
self.create_front_project()
assign('ldt_utils.change_content', get_current_user(), self)
def __unicode__(self):
return str(self.id) + ":" + self.iri_id + ":" + self.title.replace("\n", " ")
def iri_url(self, web_url=get_web_url()):
if url_utils.is_absolute(self.iriurl):
return self.iriurl
else:
res_url = unicode(settings.MEDIA_URL) + u"ldt/" + unicode(self.iriurl)
if not url_utils.is_absolute(res_url):
res_url = unicode(web_url) + res_url
return res_url
def iri_file_path(self):
return os.path.join(os.path.join(os.path.join(settings.MEDIA_ROOT, "ldt"), self.iri_id), os.path.basename(self.iriurl))
def iri_url_template(self):
return "${web_url}${media_url}ldt/" + unicode(self.iri_id) + "/" + os.path.basename(self.iriurl)
def __get_empty_media(self):
if settings.EMPTY_MEDIA_EXTERNALID:
empty_media = Media.objects.get(external_id=settings.EMPTY_MEDIA_EXTERNALID)
return empty_media
else:
return None
def stream_src(): #@NoSelf
def fget(self):
if self.media_obj is not None:
return self.media_obj.stream_src
else:
empty_media = self.__get_empty_media()
if empty_media:
return empty_media.stream_src
else:
return ""
return locals()
stream_src = property(**stream_src())
def videopath(): #@NoSelf
doc = """simulate videopath""" #@UnusedVariable
def fget(self):
if self.media_obj is None:
empty_media = self.__get_empty_media()
if empty_media:
return empty_media.videopath
else:
return None
else:
return self.media_obj.videopath
def fset(self, value):
if self.media_obj is not None:
self.media_obj.videopath = value
return locals()
videopath = property(**videopath())
def src(): #@NoSelf
doc = """simulate videopath""" #@UnusedVariable
def fget(self):
if self.media_obj is None:
empty_media = self.__get_empty_media()
if empty_media:
return empty_media.src
else:
return None
else:
return self.media_obj.src
def fset(self, value):
if self.media_obj is None or self.media_obj.src != value:
media, created = Media.objects.get_or_create(src=value, defaults={'src':value}) #@UnusedVariable
self.media_obj = media
self.save()
return locals()
src = property(**src())
def external_id(): #@NoSelf
doc = """simulate externalid""" #@UnusedVariable
def fget(self):
if self.media_obj is None:
empty_media = self.__get_empty_media()
if empty_media:
return empty_media.external_id
else:
return None
else:
return self.media_obj.external_id
def fset(self, value):
if self.media_obj is not None:
self.media_obj.external_id = value
return locals()
external_id = property(**external_id())
def is_public(): #@NoSelf
def fget(self):
if self.pk:
everyone = Group.objects.get(name=settings.PUBLIC_GROUP_NAME)
if 'view_content' in get_perms(everyone, self):
return True
return False
def fset(self, value):
if self.pk:
everyone = Group.objects.get(name=settings.PUBLIC_GROUP_NAME)
if value:
assign('ldt_utils.view_content', everyone, self)
else:
remove_perm('ldt_utils.view_content', everyone, self)
return locals()
is_public = property(**is_public())
def create_front_project(self):
old_user = get_current_user_or_admin()
if old_user.is_superuser:
admin = old_user
else:
admin = User.objects.filter(is_superuser=True)[0]
set_current_user(admin)
self.front_project = Project.create_project(admin, 'front project : %s' % self.title, [self], cuttings=['chapitrage', 'contributions'] )
self.front_project.publish(allow_write=True)
self.save()
set_current_user(old_user)
def get_or_create_front_project(self):
front_proj = self.front_project
if front_proj:
proj = front_proj
else:
# The main project for the content
proj = Project.safe_objects.filter(contents__in=[self], state=2)
if not proj:
self.create_front_project()
proj = self.front_project
else:
proj = proj[0]
return proj
# Tag management
def get_tags(self):
return Tag.objects.get_for_object(self)
# add polemic attributes and polemic attribute rates to class Content
def __add_polemic_attributes(self):
for element in POL_INDICES.keys():
if element.startswith('pol_'):
Content.add_to_class(element, property(self.__make_getter(element)))
Content.add_to_class("%s_rate" % element, property(self.__make_rate(element)))
def __make_getter(self, i):
def inner_getter(self):
if self.stat_annotation is None:
return 0;
else:
l = self.stat_annotation.polemics_volume
return l[POL_INDICES[i]]
return inner_getter
def __make_rate(self, i):
def inner_rate(self):
if self.stat_annotation is None or self.stat_annotation.nb_annotations <= 0:
return 0
return int(getattr(self, i) / float(self.stat_annotation.nb_annotations) * 100 )
return inner_rate
def annotation_volume(): #@NoSelf
def fget(self):
if self.stat_annotation is None:
return [0]*settings.DIVISIONS_FOR_STAT_ANNOTATION
else:
return self.stat_annotation.annotation_volume
return locals()
annotation_volume = property(**annotation_volume())
def nb_annotations(): #@NoSelf
def fget(self):
if self.stat_annotation is None:
return 0
else:
return self.stat_annotation.nb_annotations
return locals()
nb_annotations = property(**nb_annotations())
POL_INDICES = {
'pol_positive' : 0,
'pol_negative' : 1,
'pol_reference' : 2,
'pol_question' : 3,
}
class ContentStat(models.Model):
def __init__(self, *args, **kwargs):
super(ContentStat, self).__init__(*args, **kwargs)
if self.annotation_volume_str is None and self.polemics_volume_str is None:
self.__init_empty_stat()
content = models.OneToOneField(Content, blank=False, null=False, related_name='stat_annotation', verbose_name=_("content_stat.content"), unique=True, db_index=True)
annotation_volume_str = models.CommaSeparatedIntegerField(max_length=1024, null=True, blank=True, verbose_name=_("content_stat.annotations_volume"))
polemics_volume_str = models.CommaSeparatedIntegerField(max_length=1024, null=True, blank=True, verbose_name=_("content_stat.polemics_volume"))
nb_annotations = models.IntegerField(null=False, blank=False, verbose_name=_('content.nb_annotation'), default=0, db_index=True)
last_annotated = models.DateTimeField(default=datetime.datetime.now, verbose_name=_('content.last_annotated'), blank=True, null=True) #@UndefinedVariable
def __init_empty_stat(self):
self.annotation_volume_str = ','.join(['0']*settings.DIVISIONS_FOR_STAT_ANNOTATION)
self.polemics_volume_str = ','.join(['0']*len(settings.SYNTAX.keys()))
self.nb_annotations = 0
self.last_annotated = None
def __list2str(self, l):
return ','.join([str(c) for c in l])
def __str2list(self, s):
return [int(x) for x in s.split(',')]
def annotation_volume(): #@NoSelf
def fget(self):
return self.__str2list(self.annotation_volume_str)
def fset(self, value):
self.annotation_volume_str = self.__list2str(value)
return locals()
annotation_volume = property(**annotation_volume())
def polemics_volume(): #@NoSelf
def fget(self):
return self.__str2list(self.polemics_volume_str)
def fset(self, value):
self.polemics_volume_str = self.__list2str(value)
return locals()
polemics_volume = property(**polemics_volume())
class Project(Document, SafeModel):
STATE_CHOICES = (
(1, 'edition'),
(2, 'published'),
(3, 'moderated'),
(4, 'rejected'),
(5, 'deleted')
)
ldt_id = models.CharField(max_length=255, unique=True)
ldt = models.TextField(null=True)
title = models.CharField(max_length=1024)
contents = models.ManyToManyField(Content)
creation_date = models.DateTimeField(auto_now_add=True)
modification_date = models.DateTimeField(auto_now=True)
created_by = models.CharField(_("created by"), max_length=70)
changed_by = models.CharField(_("changed by"), max_length=70)
state = models.IntegerField(choices=STATE_CHOICES, default=1)
description = models.TextField(null=True, blank=True)
image = ImageField(upload_to="thumbnails/projects/", default=settings.DEFAULT_PROJECT_ICON, max_length=200)
class Meta:
ordering = ["title"]
permissions = (
('view_project', 'Can view project'),
)
def __setattr__(self, name, value):
super(Project, self).__setattr__(name,value)
if name == "ldt" and hasattr(self, "__ldt_encoded"):
del self.__ldt_encoded
def __unicode__(self):
return unicode(self.id) + u"::" + unicode(self.ldt_id) + u"::" + unicode(self.title)
def get_description(self, doc=None):
if doc is None:
doc = lxml.etree.fromstring(self.ldt)
res = doc.xpath("/iri/project")
if len(res) > 0:
return res[0].get(u'abstract')
else:
return None
def stream_mode(): #@NoSelf
def fget(self):
modes = []
for content in self.contents.all():
mimetype = content.mimetype
if mimetype:
mode = mimetype.split("/")[0]
if "audio" == mode or "video" == mode:
modes.append(mode)
else:
modes.append("video")
else:
modes.append("video")
def filter_video(current, item):
if not current:
return item
elif current == item:
return item
else:
return "video"
return reduce(filter_video, modes)
return locals()
stream_mode = property(**stream_mode())
@staticmethod
def create_project(user, title, contents, description='', groups=[], set_icon=True, cuttings=[]):
# owner = Owner.objects.get(user=user) #@UndefinedVariable
owner = user
project = Project(title=title, owner=owner, description=description)
project.ldt_id = str(uuid.uuid1()) #@UndefinedVariable
project.created_by = user.username
project.changed_by = user.username
project.state = 1
project.save()
assign('view_project', user, project)
assign('change_project', user, project)
for content in contents:
project.contents.add(content)
if set_icon:
project.set_icon()
project.save()
return create_ldt(project, user, cuttings)
def copy_project(self, user, title, description='', group=None):
project = Project(title=title, owner=user, description=description)
project = copy_ldt(self, project, user)
assign('view_project', user, project)
assign('change_project', user, project)
if group:
assign('view_project', group, project)
for content in self.contents.all():
project.contents.add(content)
return project
def publish(self, allow_write=False):
if not self.pk:
self.save()
self.state = 2
everyone = Group.objects.get(name=settings.PUBLIC_GROUP_NAME)
assign('ldt_utils.view_project', everyone, self)
if allow_write:
assign('ldt_utils.change_project', everyone, self)
self.save()
def unpublish(self):
if not self.pk:
self.save()
self.state = 1
everyone = Group.objects.get(name=settings.PUBLIC_GROUP_NAME)
remove_perm('ldt_utils.view_project', everyone, self)
remove_perm('ldt_utils.change_project', everyone, self)
self.save()
def set_icon(self):
default_image = os.path.basename(settings.DEFAULT_CONTENT_ICON)
for content in self.contents.all():
add_image = False
try:
current_image = content.image.file.name
except IOError:
add_image = True
if add_image or current_image != default_image:
self.image = content.image
return True
self.image = settings.DEFAULT_PROJECT_ICON
return False
def check_access(self, user):
if (user and user.is_staff) or self.state == 2:
return True
else:
return False
def has_annotations(self):
nb_annot = 0
doc = lxml.etree.fromstring(self.ldt)
res = doc.xpath("/iri/annotations/content/ensemble/decoupage")
for r in res:
nb_annot = nb_annot + r.find('elements').__len__()
if nb_annot == 0:
return False
else:
return True
def ldt_encoded(): #@NoSelf
def fget(self):
if self.ldt is None:
return None
if not hasattr(self, "__ldt_encoded"):
m = re.match("<\?xml.*?encoding\s*?=\s*?['\"]([A-Za-z][A-Za-z0-9._-]*)['\"].*?\?>", self.ldt.lstrip(), re.I|re.M|re.U)
if m and m.group(1):
encoding = m.group(1)
else:
encoding = 'utf-8'
self.__ldt_encoded = self.ldt.encode(encoding)
return self.__ldt_encoded
return locals()
ldt_encoded = property(**ldt_encoded())
class Segment(SafeModel):
project_obj = models.ForeignKey(Project, null=True)
content = models.ForeignKey(Content)
project_id = models.CharField(max_length=255, unique=False, blank=True, null=True, db_index=True)
iri_id = models.CharField(max_length=255, unique=False, db_index=True)
ensemble_id = models.CharField(max_length=512, unique=False, db_index=True)
cutting_id = models.CharField(max_length=512, unique=False, db_index=True)
element_id = models.CharField(max_length=512, unique=False, db_index=True)
tags = tagging.fields.TagField(max_length=2048, null=True, blank=True, unique=False)
title = models.CharField(max_length=2048, unique=False, null=True, blank=True)
duration = models.IntegerField(null=True)
start_ts = models.IntegerField(null=True)
author = models.CharField(max_length=1024, unique=False, null=True, blank=True)
date = models.CharField(max_length=128, unique=False, null=True, blank=True)
abstract = models.TextField(null=True, blank=True)
polemics = models.IntegerField(null=True, blank=True, default=0)
id_hash = models.CharField(max_length=128, unique=True, blank=True)
audio_src = models.CharField(max_length=255, unique=False, null=True, blank=True)
audio_href = models.CharField(max_length=512, unique=False, null=True, blank=True)
# All combinations of polemic hashtags can be represented by a combination of
# 4 bits, 1 if the hashtag is in the tweet, 0 else. We use the order OK, KO, Q, REF
# and convert the resulting string into an integer to store the polemic values.
# mask contains all possible polemic values
mask = {
'OK': set([8,9,10,11,12,13,14,15]),
'KO': set([4,5,6,7,12,13,14,15]),
'Q': set([2,3,6,7,10,11,14,15]),
'REF': set([1,3,5,7,9,11,13,15]),
}
def is_polemic(self, polemic_keyword): # OK, KO, Q, REF
if self.polemics in self.mask[polemic_keyword]:
return True
return False
def get_polemic(self, polemic_keywords):
value = set(range(16))
for keyword in self.mask.keys():
if keyword in polemic_keywords:
value = value.intersection(self.mask[keyword])
else:
value.difference_update(self.mask[keyword])
return value.pop()
def save(self, *args, **kwargs):
self.id_hash = generate_hash(self.__unicode__())
super(Segment, self).save(*args, **kwargs)
def __unicode__(self):
return "/".join((unicode(self.project_id), unicode(self.iri_id), unicode(self.ensemble_id), unicode(self.cutting_id), unicode(self.element_id)))
class Meta:
permissions = (
('view_segment', 'Can view segment'),
)