1
0
mirror of https://gitlab.crans.org/bde/nk20 synced 2024-12-23 16:02:25 +00:00
nk20/apps/permission/models.py

361 lines
13 KiB
Python
Raw Normal View History

2020-03-07 12:12:17 +00:00
# Copyright (C) 2018-2020 by BDE ENS Paris-Saclay
# SPDX-License-Identifier: GPL-3.0-or-later
2020-02-09 16:35:15 +00:00
import functools
2019-09-18 12:26:42 +00:00
import json
2020-02-09 16:35:15 +00:00
import operator
2020-08-03 08:50:55 +00:00
from copy import copy
2020-07-31 19:24:23 +00:00
from time import sleep
2019-09-18 12:26:42 +00:00
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
2020-08-03 08:50:55 +00:00
from django.core.mail import mail_admins
2019-09-18 12:26:42 +00:00
from django.db import models
from django.db.models import F, Q, Model
2020-08-03 08:50:55 +00:00
from django.forms import model_to_dict
2019-09-18 12:26:42 +00:00
from django.utils.translation import gettext_lazy as _
2020-03-20 13:43:35 +00:00
2019-09-18 12:26:42 +00:00
class InstancedPermission:
2020-03-20 00:46:59 +00:00
def __init__(self, model, query, type, field, mask, **kwargs):
2019-09-18 12:26:42 +00:00
self.model = model
2020-03-20 00:46:59 +00:00
self.raw_query = query
self.query = None
2019-09-18 12:26:42 +00:00
self.type = type
self.field = field
self.mask = mask
2020-03-20 00:46:59 +00:00
self.kwargs = kwargs
2019-09-18 12:26:42 +00:00
def applies(self, obj, permission_type, field_name=None):
2020-02-09 16:35:15 +00:00
"""
Returns True if the permission applies to
the field `field_name` object `obj`
"""
if not isinstance(obj, self.model.model_class()):
2020-03-18 14:49:52 +00:00
# The permission does not apply to the model
return False
2020-03-07 08:30:22 +00:00
if self.type == 'add':
if permission_type == self.type:
2020-03-20 00:46:59 +00:00
self.update_query()
2020-08-03 08:50:55 +00:00
obj = copy(obj)
obj.pk = 0
# Ensure previous models are deleted
for ignored in range(1000):
2020-08-03 08:50:55 +00:00
if self.model.model_class().objects.filter(pk=0).exists():
2020-07-31 19:24:23 +00:00
# If the object exists, that means that one permission is currently checked.
# We wait before the other permission, at most 1 second.
sleep(0.001)
2020-07-31 19:24:23 +00:00
continue
break
2020-08-03 08:50:55 +00:00
for o in self.model.model_class().objects.filter(pk=0).all():
o._force_delete = True
Model.delete(o)
2020-08-03 08:50:55 +00:00
# An object with pk 0 wouldn't deleted. That's not normal, we alert admins.
msg = "Lors de la vérification d'une permission d'ajout, un objet de clé primaire nulle était "\
"encore présent.\n"\
"Type de permission : " + self.type + "\n"\
"Modèle : " + str(self.model) + "\n"\
"Objet trouvé : " + str(model_to_dict(o)) + "\n\n"\
"--\nLe BDE"
mail_admins("[Note Kfet] Un objet a été supprimé de force", msg)
# Force insertion, no data verification, no trigger
2020-04-01 01:42:19 +00:00
obj._force_save = True
Model.save(obj, force_insert=True)
# We don't want log anything
obj._no_log = True
2020-08-03 08:50:55 +00:00
ret = self.model.model_class().objects.filter(self.query & Q(pk=0)).exists()
# Delete testing object
2020-04-01 01:42:19 +00:00
obj._force_delete = True
Model.delete(obj)
2020-08-03 08:50:55 +00:00
with open("/tmp/log", "w") as f:
f.write(str(obj) + ", " + str(obj.pk) + ", " + str(self.model.model_class().objects.filter(pk=0).exists()))
return ret
2020-03-07 12:12:17 +00:00
if permission_type == self.type:
if self.field and field_name != self.field:
2019-09-18 12:26:42 +00:00
return False
2020-03-20 00:46:59 +00:00
self.update_query()
return self.model.model_class().objects.filter(self.query & Q(pk=obj.pk)).exists()
2020-02-09 16:35:15 +00:00
else:
return False
2019-09-18 12:26:42 +00:00
2020-03-20 00:46:59 +00:00
def update_query(self):
2020-03-20 14:58:14 +00:00
"""
The query is not analysed in a first time. It is analysed at most once if needed.
:return:
"""
2020-03-20 00:46:59 +00:00
if not self.query:
# noinspection PyProtectedMember
self.query = Permission._about(self.raw_query, **self.kwargs)
2019-09-18 12:26:42 +00:00
def __repr__(self):
if self.field:
2020-03-07 12:12:17 +00:00
return _("Can {type} {model}.{field} in {query}").format(type=self.type, model=self.model, field=self.field, query=self.query)
2019-09-18 12:26:42 +00:00
else:
2020-03-07 12:12:17 +00:00
return _("Can {type} {model} in {query}").format(type=self.type, model=self.model, query=self.query)
def __str__(self):
return self.__repr__()
2019-09-18 12:26:42 +00:00
2020-03-19 15:12:52 +00:00
class PermissionMask(models.Model):
2020-03-20 14:58:14 +00:00
"""
Permissions that are hidden behind a mask
"""
2020-03-19 15:12:52 +00:00
rank = models.PositiveSmallIntegerField(
2020-03-19 15:27:25 +00:00
unique=True,
2020-03-19 15:12:52 +00:00
verbose_name=_('rank'),
)
description = models.CharField(
max_length=255,
2020-03-19 15:27:25 +00:00
unique=True,
2020-03-19 15:12:52 +00:00
verbose_name=_('description'),
)
def __str__(self):
return self.description
2020-04-06 08:58:16 +00:00
class Meta:
verbose_name = _("permission mask")
verbose_name_plural = _("permission masks")
2020-03-19 15:12:52 +00:00
2019-09-18 12:26:42 +00:00
class Permission(models.Model):
PERMISSION_TYPES = [
2020-07-31 20:29:23 +00:00
('add', _('add')),
('view', _('view')),
('change', _('change')),
('delete', _('delete'))
2019-09-18 12:26:42 +00:00
]
2020-04-22 11:28:52 +00:00
model = models.ForeignKey(
ContentType,
on_delete=models.CASCADE,
related_name='+',
verbose_name=_("model"),
)
2019-09-18 12:26:42 +00:00
2020-02-09 16:35:15 +00:00
# A json encoded Q object with the following grammar
# query -> [] | {} (the empty query representing all objects)
2020-03-07 12:12:17 +00:00
# query -> ["AND", query, …] AND multiple queries
# | ["OR", query, …] OR multiple queries
# | ["NOT", query] Opposite of query
2020-02-13 14:59:19 +00:00
# query -> {key: value, …} A list of fields and values of a Q object
# key -> string A field name
# value -> int | string | bool | null Literal values
2020-03-20 14:58:14 +00:00
# | [parameter, …] A parameter. See compute_param for more details.
2020-03-07 12:12:17 +00:00
# | {"F": oper} An F object
2020-03-20 14:58:14 +00:00
# oper -> [string, …] A parameter. See compute_param for more details.
2020-03-07 12:12:17 +00:00
# | ["ADD", oper, …] Sum multiple F objects or literal
# | ["SUB", oper, oper] Substract two F objects or literal
# | ["MUL", oper, …] Multiply F objects or literals
2020-02-13 14:59:19 +00:00
# | int | string | bool | null Literal values
2020-03-07 12:12:17 +00:00
# | ["F", string] A field
2020-02-09 16:35:15 +00:00
#
# Examples:
2020-03-07 12:12:17 +00:00
# Q(is_superuser=True) := {"is_superuser": true}
# ~Q(is_superuser=True) := ["NOT", {"is_superuser": true}]
2020-04-22 11:28:52 +00:00
query = models.TextField(
verbose_name=_("query"),
)
2019-09-18 12:26:42 +00:00
2020-04-22 11:28:52 +00:00
type = models.CharField(
max_length=15,
choices=PERMISSION_TYPES,
verbose_name=_("type"),
)
2019-09-18 12:26:42 +00:00
2020-03-19 15:12:52 +00:00
mask = models.ForeignKey(
PermissionMask,
on_delete=models.PROTECT,
2020-04-22 11:28:52 +00:00
related_name="permissions",
verbose_name=_("mask"),
2020-03-19 15:12:52 +00:00
)
2020-04-22 11:28:52 +00:00
field = models.CharField(
max_length=255,
blank=True,
verbose_name=_("field"),
)
2020-05-07 19:14:36 +00:00
permanent = models.BooleanField(
default=False,
help_text=_("Tells if the permission should be granted even if the membership of the user is expired."),
verbose_name=_("permanent"),
)
2020-04-22 11:28:52 +00:00
description = models.CharField(
max_length=255,
blank=True,
verbose_name=_("description"),
)
2019-09-18 12:26:42 +00:00
class Meta:
unique_together = ('model', 'query', 'type', 'field')
2020-04-06 08:58:16 +00:00
verbose_name = _("permission")
verbose_name_plural = _("permissions")
2019-09-18 12:26:42 +00:00
def clean(self):
self.query = json.dumps(json.loads(self.query))
if self.field and self.type not in {'view', 'change'}:
2019-09-18 12:26:42 +00:00
raise ValidationError(_("Specifying field applies only to view and change permission types."))
2020-03-07 12:12:17 +00:00
def save(self, **kwargs):
2019-09-18 12:26:42 +00:00
self.full_clean()
super().save()
2020-02-13 14:59:19 +00:00
@staticmethod
2020-03-07 12:12:17 +00:00
def compute_f(oper, **kwargs):
2020-02-13 14:59:19 +00:00
if isinstance(oper, list):
if oper[0] == 'ADD':
return functools.reduce(operator.add, [Permission.compute_f(oper, **kwargs) for oper in oper[1:]])
elif oper[0] == 'SUB':
return Permission.compute_f(oper[1], **kwargs) - Permission.compute_f(oper[2], **kwargs)
elif oper[0] == 'MUL':
return functools.reduce(operator.mul, [Permission.compute_f(oper, **kwargs) for oper in oper[1:]])
elif oper[0] == 'F':
return F(oper[1])
else:
field = kwargs[oper[0]]
for i in range(1, len(oper)):
field = getattr(field, oper[i])
return field
2020-02-13 14:59:19 +00:00
else:
return oper
@staticmethod
def compute_param(value, **kwargs):
2020-03-20 14:58:14 +00:00
"""
A parameter is given by a list. The first argument is the name of the parameter.
The parameters are the user, the club, and some classes (Note, ...)
If there are more arguments in the list, then attributes are queried.
For example, ["user", "note", "balance"] will return the balance of the note of the user.
If an argument is a list, then this is interpreted with a function call:
First argument is the name of the function, next arguments are parameters, and if there is a dict,
then the dict is given as kwargs.
For example: NoteUser.objects.filter(user__memberships__club__name="Kfet").all() is translated by:
["NoteUser", "objects", ["filter", {"user__memberships__club__name": "Kfet"}], ["all"]]
"""
if not isinstance(value, list):
return value
field = kwargs[value[0]]
for i in range(1, len(value)):
if isinstance(value[i], list):
if value[i][0] in kwargs:
field = Permission.compute_param(value[i], **kwargs)
continue
2020-07-28 18:22:10 +00:00
if not hasattr(field, value[i][0]):
return False
field = getattr(field, value[i][0])
params = []
call_kwargs = {}
for j in range(1, len(value[i])):
param = Permission.compute_param(value[i][j], **kwargs)
if isinstance(param, dict):
for key in param:
val = Permission.compute_param(param[key], **kwargs)
call_kwargs[key] = val
else:
params.append(param)
field = field(*params, **call_kwargs)
else:
2020-07-28 18:22:10 +00:00
if not hasattr(field, value[i]):
return False
field = getattr(field, value[i])
return field
2020-02-13 14:59:19 +00:00
2020-03-20 00:46:59 +00:00
@staticmethod
def _about(query, **kwargs):
2020-03-20 14:58:14 +00:00
"""
Translate JSON query into a Q query.
:param query: The JSON query
:param kwargs: Additional params
:return: A Q object
"""
if len(query) == 0:
# The query is either [] or {} and
2020-02-09 16:35:15 +00:00
# applies to all objects of the model
# to represent this we return a trivial request
return Q(pk=F("pk"))
if isinstance(query, list):
if query[0] == 'AND':
2020-03-20 00:46:59 +00:00
return functools.reduce(operator.and_, [Permission._about(query, **kwargs) for query in query[1:]])
elif query[0] == 'OR':
2020-03-20 00:46:59 +00:00
return functools.reduce(operator.or_, [Permission._about(query, **kwargs) for query in query[1:]])
elif query[0] == 'NOT':
2020-03-20 00:46:59 +00:00
return ~Permission._about(query[1], **kwargs)
2020-03-20 14:58:14 +00:00
else:
2020-07-28 18:22:10 +00:00
return Q(pk=F("pk")) if Permission.compute_param(query, **kwargs) else ~Q(pk=F("pk"))
elif isinstance(query, dict):
2020-02-09 16:35:15 +00:00
q_kwargs = {}
for key in query:
value = query[key]
2020-02-09 16:35:15 +00:00
if isinstance(value, list):
# It is a parameter we query its return value
q_kwargs[key] = Permission.compute_param(value, **kwargs)
2020-02-13 14:59:19 +00:00
elif isinstance(value, dict):
# It is an F object
q_kwargs[key] = Permission.compute_f(value['F'], **kwargs)
2020-02-09 16:35:15 +00:00
else:
q_kwargs[key] = value
return Q(**q_kwargs)
2019-09-18 12:26:42 +00:00
else:
2020-02-09 16:35:15 +00:00
# TODO: find a better way to crash here
2020-03-20 00:46:59 +00:00
raise Exception("query {} is wrong".format(query))
2019-09-18 12:26:42 +00:00
def about(self, **kwargs):
"""
Return an InstancedPermission with the parameters
replaced by their values and the query interpreted
"""
query = json.loads(self.query)
2020-03-20 00:46:59 +00:00
# query = self._about(query, **kwargs)
return InstancedPermission(self.model, query, self.type, self.field, self.mask, **kwargs)
2019-09-18 12:26:42 +00:00
def __str__(self):
2020-04-22 11:28:52 +00:00
return self.description
2019-09-18 12:26:42 +00:00
2020-03-20 13:43:35 +00:00
2020-07-25 17:40:30 +00:00
class Role(models.Model):
2020-03-20 13:43:35 +00:00
"""
Permissions associated with a Role
"""
2020-07-25 17:40:30 +00:00
name = models.CharField(
max_length=255,
verbose_name=_("name"),
2020-03-20 13:43:35 +00:00
)
2020-07-25 17:40:30 +00:00
2020-03-20 13:43:35 +00:00
permissions = models.ManyToManyField(
Permission,
verbose_name=_("permissions"),
2020-03-20 13:43:35 +00:00
)
for_club = models.ForeignKey(
"member.Club",
verbose_name=_("for club"),
on_delete=models.PROTECT,
null=True,
default=None,
)
2020-03-20 13:43:35 +00:00
def __str__(self):
2020-07-25 17:40:30 +00:00
return self.name
2020-04-06 08:58:16 +00:00
class Meta:
verbose_name = _("role permissions")
verbose_name_plural = _("role permissions")