nk20/apps/permission/models.py

352 lines
12 KiB
Python
Raw Permalink Normal View History

# Copyright (C) 2018-2021 by BDE ENS Paris-Saclay
2020-03-07 12:12:17 +00:00
# SPDX-License-Identifier: GPL-3.0-or-later
2020-02-09 16:35:15 +00:00
import functools
2019-09-18 12:26:42 +00:00
import json
2020-02-09 16:35:15 +00:00
import operator
2020-08-03 08:50:55 +00:00
from copy import copy
2019-09-18 12:26:42 +00:00
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
2020-08-03 08:50:55 +00:00
from django.core.mail import mail_admins
from django.db import models, transaction
from django.db.models import F, Q, Model
2020-08-03 08:50:55 +00:00
from django.forms import model_to_dict
2019-09-18 12:26:42 +00:00
from django.utils.translation import gettext_lazy as _
2020-03-20 13:43:35 +00:00
2019-09-18 12:26:42 +00:00
class InstancedPermission:
2020-03-20 00:46:59 +00:00
def __init__(self, model, query, type, field, mask, **kwargs):
2019-09-18 12:26:42 +00:00
self.model = model
2020-03-20 00:46:59 +00:00
self.raw_query = query
self.query = None
2019-09-18 12:26:42 +00:00
self.type = type
self.field = field
self.mask = mask
2020-03-20 00:46:59 +00:00
self.kwargs = kwargs
2019-09-18 12:26:42 +00:00
def applies(self, obj, permission_type, field_name=None):
2020-02-09 16:35:15 +00:00
"""
Returns True if the permission applies to
the field `field_name` object `obj`
"""
if not isinstance(obj, self.model.model_class()):
2020-03-18 14:49:52 +00:00
# The permission does not apply to the model
return False
2020-03-07 08:30:22 +00:00
if self.type == 'add':
if permission_type == self.type:
2020-03-20 00:46:59 +00:00
self.update_query()
2020-08-03 08:50:55 +00:00
obj = copy(obj)
obj.pk = 0
with transaction.atomic():
sid = transaction.savepoint()
for o in self.model.model_class().objects.filter(pk=0).all():
o._no_signal = True
o._force_delete = True
Model.delete(o)
# An object with pk 0 wouldn't deleted. That's not normal, we alert admins.
msg = "Lors de la vérification d'une permission d'ajout, un objet de clé primaire nulle était "\
"encore présent.\n"\
"Type de permission : " + self.type + "\n"\
"Modèle : " + str(self.model) + "\n"\
"Objet trouvé : " + str(model_to_dict(o)) + "\n\n"\
"--\nLe BDE"
mail_admins("[Note Kfet] Un objet a été supprimé de force", msg)
2020-08-03 08:50:55 +00:00
# Force insertion, no data verification, no trigger
obj._force_save = True
# We don't want to trigger any signal (log,…)
obj._no_signal = True
Model.save(obj, force_insert=True)
ret = self.model.model_class().objects.filter(self.query & Q(pk=0)).exists()
transaction.savepoint_rollback(sid)
return ret
2020-03-07 12:12:17 +00:00
if permission_type == self.type:
if self.field and field_name != self.field:
2019-09-18 12:26:42 +00:00
return False
2020-03-20 00:46:59 +00:00
self.update_query()
return self.model.model_class().objects.filter(self.query & Q(pk=obj.pk)).exists()
2020-02-09 16:35:15 +00:00
else:
return False
2019-09-18 12:26:42 +00:00
2020-03-20 00:46:59 +00:00
def update_query(self):
2020-03-20 14:58:14 +00:00
"""
The query is not analysed in a first time. It is analysed at most once if needed.
:return:
"""
2020-03-20 00:46:59 +00:00
if not self.query:
# noinspection PyProtectedMember
self.query = Permission._about(self.raw_query, **self.kwargs)
2019-09-18 12:26:42 +00:00
def __repr__(self):
if self.field:
2020-03-07 12:12:17 +00:00
return _("Can {type} {model}.{field} in {query}").format(type=self.type, model=self.model, field=self.field, query=self.query)
2019-09-18 12:26:42 +00:00
else:
2020-03-07 12:12:17 +00:00
return _("Can {type} {model} in {query}").format(type=self.type, model=self.model, query=self.query)
def __str__(self):
return self.__repr__()
2019-09-18 12:26:42 +00:00
2020-03-19 15:12:52 +00:00
class PermissionMask(models.Model):
2020-03-20 14:58:14 +00:00
"""
Permissions that are hidden behind a mask
"""
2020-03-19 15:12:52 +00:00
rank = models.PositiveSmallIntegerField(
2020-03-19 15:27:25 +00:00
unique=True,
2020-03-19 15:12:52 +00:00
verbose_name=_('rank'),
)
description = models.CharField(
max_length=255,
2020-03-19 15:27:25 +00:00
unique=True,
2020-03-19 15:12:52 +00:00
verbose_name=_('description'),
)
def __str__(self):
return self.description
2020-04-06 08:58:16 +00:00
class Meta:
verbose_name = _("permission mask")
verbose_name_plural = _("permission masks")
2020-03-19 15:12:52 +00:00
2019-09-18 12:26:42 +00:00
class Permission(models.Model):
PERMISSION_TYPES = [
2020-07-31 20:29:23 +00:00
('add', _('add')),
('view', _('view')),
('change', _('change')),
('delete', _('delete'))
2019-09-18 12:26:42 +00:00
]
2020-04-22 11:28:52 +00:00
model = models.ForeignKey(
ContentType,
on_delete=models.CASCADE,
related_name='+',
verbose_name=_("model"),
)
2019-09-18 12:26:42 +00:00
2020-02-09 16:35:15 +00:00
# A json encoded Q object with the following grammar
# query -> [] | {} (the empty query representing all objects)
2020-03-07 12:12:17 +00:00
# query -> ["AND", query, …] AND multiple queries
# | ["OR", query, …] OR multiple queries
# | ["NOT", query] Opposite of query
2020-02-13 14:59:19 +00:00
# query -> {key: value, …} A list of fields and values of a Q object
# key -> string A field name
# value -> int | string | bool | null Literal values
2020-03-20 14:58:14 +00:00
# | [parameter, …] A parameter. See compute_param for more details.
2020-03-07 12:12:17 +00:00
# | {"F": oper} An F object
2020-03-20 14:58:14 +00:00
# oper -> [string, …] A parameter. See compute_param for more details.
2020-03-07 12:12:17 +00:00
# | ["ADD", oper, …] Sum multiple F objects or literal
# | ["SUB", oper, oper] Substract two F objects or literal
# | ["MUL", oper, …] Multiply F objects or literals
2020-02-13 14:59:19 +00:00
# | int | string | bool | null Literal values
2020-03-07 12:12:17 +00:00
# | ["F", string] A field
2020-02-09 16:35:15 +00:00
#
# Examples:
2020-03-07 12:12:17 +00:00
# Q(is_superuser=True) := {"is_superuser": true}
# ~Q(is_superuser=True) := ["NOT", {"is_superuser": true}]
2020-04-22 11:28:52 +00:00
query = models.TextField(
verbose_name=_("query"),
)
2019-09-18 12:26:42 +00:00
2020-04-22 11:28:52 +00:00
type = models.CharField(
max_length=15,
choices=PERMISSION_TYPES,
verbose_name=_("type"),
)
2019-09-18 12:26:42 +00:00
2020-03-19 15:12:52 +00:00
mask = models.ForeignKey(
PermissionMask,
on_delete=models.PROTECT,
2020-04-22 11:28:52 +00:00
related_name="permissions",
verbose_name=_("mask"),
2020-03-19 15:12:52 +00:00
)
2020-04-22 11:28:52 +00:00
field = models.CharField(
max_length=255,
blank=True,
verbose_name=_("field"),
)
2020-05-07 19:14:36 +00:00
permanent = models.BooleanField(
default=False,
help_text=_("Tells if the permission should be granted even if the membership of the user is expired."),
verbose_name=_("permanent"),
)
2020-04-22 11:28:52 +00:00
description = models.CharField(
max_length=255,
blank=True,
verbose_name=_("description"),
)
2019-09-18 12:26:42 +00:00
class Meta:
unique_together = ('model', 'query', 'type', 'field')
2020-04-06 08:58:16 +00:00
verbose_name = _("permission")
verbose_name_plural = _("permissions")
2019-09-18 12:26:42 +00:00
def clean(self):
self.query = json.dumps(json.loads(self.query))
if self.field and self.type not in {'view', 'change'}:
2019-09-18 12:26:42 +00:00
raise ValidationError(_("Specifying field applies only to view and change permission types."))
2020-09-11 20:52:16 +00:00
@transaction.atomic
2020-03-07 12:12:17 +00:00
def save(self, **kwargs):
2019-09-18 12:26:42 +00:00
self.full_clean()
super().save()
2020-02-13 14:59:19 +00:00
@staticmethod
2020-03-07 12:12:17 +00:00
def compute_f(oper, **kwargs):
2020-02-13 14:59:19 +00:00
if isinstance(oper, list):
if oper[0] == 'ADD':
return functools.reduce(operator.add, [Permission.compute_f(oper, **kwargs) for oper in oper[1:]])
elif oper[0] == 'SUB':
return Permission.compute_f(oper[1], **kwargs) - Permission.compute_f(oper[2], **kwargs)
elif oper[0] == 'MUL':
return functools.reduce(operator.mul, [Permission.compute_f(oper, **kwargs) for oper in oper[1:]])
elif oper[0] == 'F':
return F(oper[1])
else:
field = kwargs[oper[0]]
for i in range(1, len(oper)):
field = getattr(field, oper[i])
return field
2020-02-13 14:59:19 +00:00
else:
return oper
@staticmethod
def compute_param(value, **kwargs):
2020-03-20 14:58:14 +00:00
"""
A parameter is given by a list. The first argument is the name of the parameter.
The parameters are the user, the club, and some classes (Note,)
2020-03-20 14:58:14 +00:00
If there are more arguments in the list, then attributes are queried.
For example, ["user", "note", "balance"] will return the balance of the note of the user.
If an argument is a list, then this is interpreted with a function call:
First argument is the name of the function, next arguments are parameters, and if there is a dict,
then the dict is given as kwargs.
For example: NoteUser.objects.filter(user__memberships__club__name="Kfet").all() is translated by:
["NoteUser", "objects", ["filter", {"user__memberships__club__name": "Kfet"}], ["all"]]
"""
if not isinstance(value, list):
return value
field = kwargs[value[0]]
for i in range(1, len(value)):
if isinstance(value[i], list):
if value[i][0] in kwargs:
field = Permission.compute_param(value[i], **kwargs)
continue
2020-07-28 18:22:10 +00:00
if not hasattr(field, value[i][0]):
return False
field = getattr(field, value[i][0])
params = []
call_kwargs = {}
for j in range(1, len(value[i])):
param = Permission.compute_param(value[i][j], **kwargs)
if isinstance(param, dict):
for key in param:
val = Permission.compute_param(param[key], **kwargs)
call_kwargs[key] = val
else:
params.append(param)
field = field(*params, **call_kwargs)
else:
2020-07-28 18:22:10 +00:00
if not hasattr(field, value[i]):
return False
field = getattr(field, value[i])
return field
2020-02-13 14:59:19 +00:00
2020-03-20 00:46:59 +00:00
@staticmethod
def _about(query, **kwargs):
2020-03-20 14:58:14 +00:00
"""
Translate JSON query into a Q query.
:param query: The JSON query
:param kwargs: Additional params
:return: A Q object
"""
if len(query) == 0:
# The query is either [] or {} and
2020-02-09 16:35:15 +00:00
# applies to all objects of the model
# to represent this we return a trivial request
return Q(pk=F("pk"))
if isinstance(query, list):
if query[0] == 'AND':
2020-03-20 00:46:59 +00:00
return functools.reduce(operator.and_, [Permission._about(query, **kwargs) for query in query[1:]])
elif query[0] == 'OR':
2020-03-20 00:46:59 +00:00
return functools.reduce(operator.or_, [Permission._about(query, **kwargs) for query in query[1:]])
elif query[0] == 'NOT':
2020-03-20 00:46:59 +00:00
return ~Permission._about(query[1], **kwargs)
2020-03-20 14:58:14 +00:00
else:
2020-07-28 18:22:10 +00:00
return Q(pk=F("pk")) if Permission.compute_param(query, **kwargs) else ~Q(pk=F("pk"))
elif isinstance(query, dict):
2020-02-09 16:35:15 +00:00
q_kwargs = {}
for key in query:
value = query[key]
2020-02-09 16:35:15 +00:00
if isinstance(value, list):
# It is a parameter we query its return value
q_kwargs[key] = Permission.compute_param(value, **kwargs)
2020-02-13 14:59:19 +00:00
elif isinstance(value, dict):
# It is an F object
q_kwargs[key] = Permission.compute_f(value['F'], **kwargs)
2020-02-09 16:35:15 +00:00
else:
q_kwargs[key] = value
return Q(**q_kwargs)
2019-09-18 12:26:42 +00:00
else:
2020-02-09 16:35:15 +00:00
# TODO: find a better way to crash here
2020-03-20 00:46:59 +00:00
raise Exception("query {} is wrong".format(query))
2019-09-18 12:26:42 +00:00
def about(self, **kwargs):
"""
Return an InstancedPermission with the parameters
replaced by their values and the query interpreted
"""
query = json.loads(self.query)
2020-03-20 00:46:59 +00:00
# query = self._about(query, **kwargs)
return InstancedPermission(self.model, query, self.type, self.field, self.mask, **kwargs)
2019-09-18 12:26:42 +00:00
def __str__(self):
2020-04-22 11:28:52 +00:00
return self.description
2019-09-18 12:26:42 +00:00
2020-03-20 13:43:35 +00:00
2020-07-25 17:40:30 +00:00
class Role(models.Model):
2020-03-20 13:43:35 +00:00
"""
Permissions associated with a Role
"""
2020-07-25 17:40:30 +00:00
name = models.CharField(
max_length=255,
verbose_name=_("name"),
2020-03-20 13:43:35 +00:00
)
2020-07-25 17:40:30 +00:00
2020-03-20 13:43:35 +00:00
permissions = models.ManyToManyField(
Permission,
verbose_name=_("permissions"),
2020-03-20 13:43:35 +00:00
)
for_club = models.ForeignKey(
"member.Club",
verbose_name=_("for club"),
on_delete=models.PROTECT,
null=True,
default=None,
)
2020-03-20 13:43:35 +00:00
def __str__(self):
2020-07-25 17:40:30 +00:00
return self.name
2020-04-06 08:58:16 +00:00
class Meta:
verbose_name = _("role permissions")
verbose_name_plural = _("role permissions")