from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django.contrib.auth.models import User
from django.core.signals import request_started
try:
from threading import local
except ImportError:
from django.utils._threading_local import local
_thread_locals = local()
# The function that protect models is called on the first
# HTTP request sent to the server (see function protect_models_request
# in this file), and can not be called in this file directly
# because of circular import.
#
# To protect models from command line, use set_current_user(my_user)
# and protect_models().
_models_are_protected = False
def get_current_user():
return getattr(_thread_locals, 'user', None)
def set_current_user(user):
_thread_locals.user = user
def del_current_user():
del _thread_locals.user
def get_anonymous_user():
if hasattr(get_anonymous_user, 'anonymous_user'):
return get_anonymous_user.anonymous_user
get_anonymous_user.anonymous_user = User.objects.get(id=settings.ANONYMOUS_USER_ID)
return get_anonymous_user.anonymous_user
def get_current_user_or_admin():
current = get_current_user()
if current:
return current
admin = User.objects.filter(is_superuser=True)[0]
return admin
def protect_models():
cls_list = get_models_to_protect()
if cls_list:
for cls in get_models_to_protect():
protect_model(cls)
_models_are_protected = True
def unprotect_models():
for cls in get_models_to_protect():
unprotect_model(cls)
_models_are_protected = False
def get_models_to_protect():
if hasattr(get_models_to_protect, 'cls_list'):
return get_models_to_protect.cls_list
cls_list = []
for cls_name in settings.USE_GROUP_PERMISSIONS:
cls_type = ContentType.objects.get(model=cls_name.lower())
cls_list.append(cls_type.model_class())
get_models_to_protect.cls_list = cls_list
return cls_list
def protect_model(cls):
if not hasattr(cls, 'unsafe_save'):
cls.unsafe_save = cls.save
cls.unsafe_delete = cls.delete
class_name = cls.__name__.lower()
cls.save = change_security(class_name)(cls.save)
cls.delete = change_security(class_name)(cls.delete)
cls.safe_objects.check_perm = True
def unprotect_model(cls):
if hasattr(cls, 'unsafe_save'):
cls.save = cls.unsafe_save
cls.delete = cls.unsafe_delete
del cls.unsafe_save
del cls.unsafe_delete
cls.safe_objects.check_perm = False
def change_security(cls_name):
def wrapper(func):
def wrapped(self, *args, **kwargs):
user = get_current_user()
if not user:
user = get_anonymous_user()
if self.pk and not user.has_perm('change_%s' % cls_name, self):
raise AttributeError('User %s is not allowed to change object %s' % (user, self))
return func(self, *args, **kwargs)
return wrapped
return wrapper
def protect_models_request(sender, **kwargs):
if not _models_are_protected:
protect_models()
request_started.connect(protect_models_request)