1
0
mirror of https://gitlab.crans.org/bde/nk20 synced 2025-06-21 01:48:21 +02:00

Handle permissions (and it seems working!)

This commit is contained in:
Yohann D'ANELLO
2020-03-18 14:42:35 +01:00
parent 112d4b6c5a
commit 057f42fdb6
19 changed files with 357 additions and 57 deletions

View File

@ -0,0 +1,4 @@
# Copyright (C) 2018-2020 by BDE ENS Paris-Saclay
# SPDX-License-Identifier: GPL-3.0-or-later
default_app_config = 'permission.apps.PermissionConfig'

View File

View File

@ -0,0 +1,17 @@
# Copyright (C) 2018-2020 by BDE ENS Paris-Saclay
# SPDX-License-Identifier: GPL-3.0-or-later
from rest_framework import serializers
from ..models import Permission
class PermissionSerializer(serializers.ModelSerializer):
"""
REST API Serializer for Permission types.
The djangorestframework plugin will analyse the model `Permission` and parse all fields in the API.
"""
class Meta:
model = Permission
fields = '__all__'

View File

@ -0,0 +1,11 @@
# Copyright (C) 2018-2020 by BDE ENS Paris-Saclay
# SPDX-License-Identifier: GPL-3.0-or-later
from .views import PermissionViewSet
def register_permission_urls(router, path):
"""
Configure router for permission REST API.
"""
router.register(path, PermissionViewSet)

View File

@ -0,0 +1,20 @@
# Copyright (C) 2018-2020 by BDE ENS Paris-Saclay
# SPDX-License-Identifier: GPL-3.0-or-later
from django_filters.rest_framework import DjangoFilterBackend
from api.viewsets import ReadOnlyProtectedModelViewSet
from .serializers import PermissionSerializer
from ..models import Permission
class PermissionViewSet(ReadOnlyProtectedModelViewSet):
"""
REST API View set.
The djangorestframework plugin will get all `Changelog` objects, serialize it to JSON with the given serializer,
then render it on /api/logs/
"""
queryset = Permission.objects.all()
serializer_class = PermissionSerializer
filter_backends = [DjangoFilterBackend]
filterset_fields = ['model', 'type', ]

View File

@ -2,7 +2,14 @@
# SPDX-License-Identifier: GPL-3.0-or-later
from django.apps import AppConfig
from django.db.models.signals import pre_save, pre_delete
class PermissionConfig(AppConfig):
name = 'permission'
def ready(self):
# noinspection PyUnresolvedReferences
from . import signals
pre_save.connect(signals.pre_save_object)
pre_delete.connect(signals.pre_delete_object)

View File

@ -27,12 +27,13 @@ class InstancedPermission:
"""
if self.type == 'add':
if permission_type == self.type:
return obj in self.model.modelclass().objects.get(self.query)
return self.query(obj)
if ContentType.objects.get_for_model(obj) != self.model:
# The permission does not apply to the model
return False
if permission_type == self.type:
if field_name and field_name != self.field:
if self.field and field_name != self.field:
return False
return obj in self.model.model_class().objects.filter(self.query).all()
else:
@ -91,6 +92,7 @@ class Permission(models.Model):
unique_together = ('model', 'query', 'type', 'field')
def clean(self):
self.query = json.dumps(json.loads(self.query))
if self.field and self.type not in {'view', 'change'}:
raise ValidationError(_("Specifying field applies only to view and change permission types."))
@ -101,21 +103,45 @@ class Permission(models.Model):
@staticmethod
def compute_f(oper, **kwargs):
if isinstance(oper, list):
if len(oper) == 1:
return kwargs[oper[0]].pk
elif len(oper) >= 2:
if oper[0] == 'ADD':
return functools.reduce(operator.add, [Permission.compute_f(oper, **kwargs) for oper in oper[1:]])
elif oper[0] == 'SUB':
return Permission.compute_f(oper[1], **kwargs) - Permission.compute_f(oper[2], **kwargs)
elif oper[0] == 'MUL':
return functools.reduce(operator.mul, [Permission.compute_f(oper, **kwargs) for oper in oper[1:]])
elif oper[0] == 'F':
return F(oper[1])
if oper[0] == 'ADD':
return functools.reduce(operator.add, [Permission.compute_f(oper, **kwargs) for oper in oper[1:]])
elif oper[0] == 'SUB':
return Permission.compute_f(oper[1], **kwargs) - Permission.compute_f(oper[2], **kwargs)
elif oper[0] == 'MUL':
return functools.reduce(operator.mul, [Permission.compute_f(oper, **kwargs) for oper in oper[1:]])
elif oper[0] == 'F':
return F(oper[1])
else:
field = kwargs[oper[0]]
for i in range(1, len(oper)):
field = getattr(field, oper[i])
return field
else:
return oper
# TODO: find a better way to crash here
raise Exception("F is wrong")
@staticmethod
def compute_param(value, **kwargs):
if not isinstance(value, list):
return value
field = kwargs[value[0]]
for i in range(1, len(value)):
if isinstance(value[i], list):
field = getattr(field, value[i][0])
params = []
call_kwargs = {}
for j in range(1, len(value[i])):
param = Permission.compute_param(value[i][j], **kwargs)
if isinstance(param, dict):
for key in param:
val = Permission.compute_param(param[key], **kwargs)
call_kwargs[key] = val
else:
params.append(param)
field = field(*params, **call_kwargs)
else:
field = getattr(field, value[i])
return field
def _about(self, query, **kwargs):
if self.type == 'add':
@ -124,8 +150,8 @@ class Permission(models.Model):
if len(query) == 0:
# The query is either [] or {} and
# applies to all objects of the model
# to represent this we return None
return None
# to represent this we return a trivial request
return Q(pk=F("pk"))
if isinstance(query, list):
if query[0] == 'AND':
return functools.reduce(operator.and_, [self._about(query, **kwargs) for query in query[1:]])
@ -138,11 +164,11 @@ class Permission(models.Model):
for key in query:
value = query[key]
if isinstance(value, list):
# It is a parameter we query its primary key
q_kwargs[key] = kwargs[value[0]].pk
# It is a parameter we query its return value
q_kwargs[key] = Permission.compute_param(value, **kwargs)
elif isinstance(value, dict):
# It is an F object
q_kwargs[key] = Permission.compute_f(query['F'], **kwargs)
q_kwargs[key] = Permission.compute_f(value['F'], **kwargs)
else:
q_kwargs[key] = value
return Q(**q_kwargs)
@ -167,7 +193,7 @@ class Permission(models.Model):
value = query[key]
if isinstance(value, list):
# It is a parameter we query its primary key
q_kwargs[key] = kwargs[value[0]].pk
q_kwargs[key] = Permission.compute_param(value, **kwargs)
elif isinstance(value, dict):
# It is an F object
q_kwargs[key] = Permission.compute_f(query['F'], **kwargs)
@ -176,7 +202,7 @@ class Permission(models.Model):
def func(obj):
nonlocal q_kwargs
for arg in q_kwargs:
if getattr(obj, arg) != q_kwargs(arg):
if getattr(obj, arg) != q_kwargs[arg]:
return False
return True
return func

View File

@ -0,0 +1,58 @@
# Copyright (C) 2018-2020 by BDE ENS Paris-Saclay
# SPDX-License-Identifier: GPL-3.0-or-later
from rest_framework.permissions import DjangoObjectPermissions
SAFE_METHODS = ('HEAD', 'OPTIONS', )
class StrongDjangoObjectPermissions(DjangoObjectPermissions):
perms_map = {
'GET': ['%(app_label)s.view_%(model_name)s'],
'OPTIONS': [],
'HEAD': [],
'POST': ['%(app_label)s.add_%(model_name)s'],
'PUT': ['%(app_label)s.change_%(model_name)s'],
'PATCH': ['%(app_label)s.change_%(model_name)s'],
'DELETE': ['%(app_label)s.delete_%(model_name)s'],
}
def get_required_object_permissions(self, method, model_cls):
kwargs = {
'app_label': model_cls._meta.app_label,
'model_name': model_cls._meta.model_name
}
if method not in self.perms_map:
from rest_framework import exceptions
raise exceptions.MethodNotAllowed(method)
return [perm % kwargs for perm in self.perms_map[method]]
def has_object_permission(self, request, view, obj):
# authentication checks have already executed via has_permission
queryset = self._queryset(view)
model_cls = queryset.model
user = request.user
perms = self.get_required_object_permissions(request.method, model_cls)
if not user.has_perms(perms, obj):
# If the user does not have permissions we need to determine if
# they have read permissions to see 403, or not, and simply see
# a 404 response.
from django.http import Http404
if request.method in SAFE_METHODS:
# Read permissions already checked and failed, no need
# to make another lookup.
raise Http404
read_perms = self.get_required_object_permissions('GET', model_cls)
if not user.has_perms(read_perms, obj):
raise Http404
# Has read permissions.
return False
return True

View File

@ -0,0 +1,75 @@
# Copyright (C) 2018-2020 by BDE ENS Paris-Saclay
# SPDX-License-Identifier: GPL-3.0-or-later
from django.core.exceptions import PermissionDenied
from logs.middlewares import get_current_authenticated_user
EXCLUDED = [
'cas_server.proxygrantingticket',
'cas_server.proxyticket',
'cas_server.serviceticket',
'cas_server.user',
'cas_server.userattributes',
'contenttypes.contenttype',
'logs.changelog',
'migrations.migration',
'sessions.session',
]
def pre_save_object(sender, instance, **kwargs):
"""
Before a model get saved, we check the permissions
"""
# noinspection PyProtectedMember
if instance._meta.label_lower in EXCLUDED:
return
user = get_current_authenticated_user()
if user is None:
# Action performed on shell is always granted
return
qs = sender.objects.filter(pk=instance.pk).all()
model_name_full = instance._meta.label_lower.split(".")
app_label = model_name_full[0]
model_name = model_name_full[1]
if qs.exists():
if user.has_perm(app_label + ".change_" + model_name, instance):
return
previous = qs.get()
for field in instance._meta.fields:
field_name = field.name
old_value = getattr(previous, field.name)
new_value = getattr(instance, field.name)
if old_value == new_value:
continue
if not user.has_perm(app_label + ".change_" + model_name + "_" + field_name, instance):
raise PermissionDenied
else:
if not user.has_perm(app_label + ".add_" + model_name, instance):
raise PermissionDenied
def pre_delete_object(sender, instance, **kwargs):
"""
Before a model get deleted, we check the permissions
"""
# noinspection PyProtectedMember
if instance._meta.label_lower in EXCLUDED:
return
user = get_current_authenticated_user()
if user is None:
# Action performed on shell is always granted
return
model_name_full = instance._meta.label_lower.split(".")
app_label = model_name_full[0]
model_name = model_name_full[1]
if not user.has_perm(app_label + ".delete_" + model_name, instance):
raise PermissionDenied