2020-03-07 12:12:17 +00:00
|
|
|
|
# Copyright (C) 2018-2020 by BDE ENS Paris-Saclay
|
|
|
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
|
|
2020-02-09 16:35:15 +00:00
|
|
|
|
import functools
|
2019-09-18 12:26:42 +00:00
|
|
|
|
import json
|
2020-02-09 16:35:15 +00:00
|
|
|
|
import operator
|
2019-09-18 12:26:42 +00:00
|
|
|
|
|
|
|
|
|
from django.contrib.contenttypes.models import ContentType
|
|
|
|
|
from django.core.exceptions import ValidationError
|
|
|
|
|
from django.db import models
|
2020-03-19 23:06:28 +00:00
|
|
|
|
from django.db.models import F, Q, Model
|
2019-09-18 12:26:42 +00:00
|
|
|
|
from django.utils.translation import gettext_lazy as _
|
2020-03-20 13:43:35 +00:00
|
|
|
|
from member.models import Role
|
|
|
|
|
|
2019-09-18 12:26:42 +00:00
|
|
|
|
|
|
|
|
|
class InstancedPermission:
|
|
|
|
|
|
2020-03-20 00:46:59 +00:00
|
|
|
|
def __init__(self, model, query, type, field, mask, **kwargs):
|
2019-09-18 12:26:42 +00:00
|
|
|
|
self.model = model
|
2020-03-20 00:46:59 +00:00
|
|
|
|
self.raw_query = query
|
|
|
|
|
self.query = None
|
2019-09-18 12:26:42 +00:00
|
|
|
|
self.type = type
|
|
|
|
|
self.field = field
|
2020-03-19 17:53:06 +00:00
|
|
|
|
self.mask = mask
|
2020-03-20 00:46:59 +00:00
|
|
|
|
self.kwargs = kwargs
|
2019-09-18 12:26:42 +00:00
|
|
|
|
|
|
|
|
|
def applies(self, obj, permission_type, field_name=None):
|
2020-02-09 16:35:15 +00:00
|
|
|
|
"""
|
|
|
|
|
Returns True if the permission applies to
|
|
|
|
|
the field `field_name` object `obj`
|
|
|
|
|
"""
|
2020-03-19 17:53:06 +00:00
|
|
|
|
|
|
|
|
|
if not isinstance(obj, self.model.model_class()):
|
2020-03-18 14:49:52 +00:00
|
|
|
|
# The permission does not apply to the model
|
|
|
|
|
return False
|
|
|
|
|
|
2020-03-07 08:30:22 +00:00
|
|
|
|
if self.type == 'add':
|
|
|
|
|
if permission_type == self.type:
|
2020-03-20 00:46:59 +00:00
|
|
|
|
self.update_query()
|
|
|
|
|
|
2020-03-19 23:06:28 +00:00
|
|
|
|
# Don't increase indexes
|
|
|
|
|
obj.pk = 0
|
|
|
|
|
# Force insertion, no data verification, no trigger
|
|
|
|
|
Model.save(obj, force_insert=True)
|
|
|
|
|
ret = obj in self.model.model_class().objects.filter(self.query).all()
|
|
|
|
|
# Delete testing object
|
|
|
|
|
Model.delete(obj)
|
|
|
|
|
return ret
|
2020-03-18 13:42:35 +00:00
|
|
|
|
|
2020-03-07 12:12:17 +00:00
|
|
|
|
if permission_type == self.type:
|
2020-03-18 13:42:35 +00:00
|
|
|
|
if self.field and field_name != self.field:
|
2019-09-18 12:26:42 +00:00
|
|
|
|
return False
|
2020-03-20 00:46:59 +00:00
|
|
|
|
self.update_query()
|
2020-03-07 12:12:17 +00:00
|
|
|
|
return obj in self.model.model_class().objects.filter(self.query).all()
|
2020-02-09 16:35:15 +00:00
|
|
|
|
else:
|
|
|
|
|
return False
|
2019-09-18 12:26:42 +00:00
|
|
|
|
|
2020-03-20 00:46:59 +00:00
|
|
|
|
def update_query(self):
|
2020-03-20 14:58:14 +00:00
|
|
|
|
"""
|
|
|
|
|
The query is not analysed in a first time. It is analysed at most once if needed.
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
2020-03-20 00:46:59 +00:00
|
|
|
|
if not self.query:
|
|
|
|
|
# noinspection PyProtectedMember
|
|
|
|
|
self.query = Permission._about(self.raw_query, **self.kwargs)
|
|
|
|
|
|
2019-09-18 12:26:42 +00:00
|
|
|
|
def __repr__(self):
|
|
|
|
|
if self.field:
|
2020-03-07 12:12:17 +00:00
|
|
|
|
return _("Can {type} {model}.{field} in {query}").format(type=self.type, model=self.model, field=self.field, query=self.query)
|
2019-09-18 12:26:42 +00:00
|
|
|
|
else:
|
2020-03-07 12:12:17 +00:00
|
|
|
|
return _("Can {type} {model} in {query}").format(type=self.type, model=self.model, query=self.query)
|
|
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
|
return self.__repr__()
|
2019-09-18 12:26:42 +00:00
|
|
|
|
|
|
|
|
|
|
2020-03-19 15:12:52 +00:00
|
|
|
|
class PermissionMask(models.Model):
|
2020-03-20 14:58:14 +00:00
|
|
|
|
"""
|
|
|
|
|
Permissions that are hidden behind a mask
|
|
|
|
|
"""
|
|
|
|
|
|
2020-03-19 15:12:52 +00:00
|
|
|
|
rank = models.PositiveSmallIntegerField(
|
2020-03-19 15:27:25 +00:00
|
|
|
|
unique=True,
|
2020-03-19 15:12:52 +00:00
|
|
|
|
verbose_name=_('rank'),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
description = models.CharField(
|
|
|
|
|
max_length=255,
|
2020-03-19 15:27:25 +00:00
|
|
|
|
unique=True,
|
2020-03-19 15:12:52 +00:00
|
|
|
|
verbose_name=_('description'),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
|
return self.description
|
|
|
|
|
|
|
|
|
|
|
2019-09-18 12:26:42 +00:00
|
|
|
|
class Permission(models.Model):
|
|
|
|
|
|
|
|
|
|
PERMISSION_TYPES = [
|
2019-09-18 14:39:37 +00:00
|
|
|
|
('add', 'add'),
|
|
|
|
|
('view', 'view'),
|
|
|
|
|
('change', 'change'),
|
|
|
|
|
('delete', 'delete')
|
2019-09-18 12:26:42 +00:00
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
model = models.ForeignKey(ContentType, on_delete=models.CASCADE, related_name='+')
|
|
|
|
|
|
2020-02-09 16:35:15 +00:00
|
|
|
|
# A json encoded Q object with the following grammar
|
2020-02-09 17:14:36 +00:00
|
|
|
|
# query -> [] | {} (the empty query representing all objects)
|
2020-03-07 12:12:17 +00:00
|
|
|
|
# query -> ["AND", query, …] AND multiple queries
|
|
|
|
|
# | ["OR", query, …] OR multiple queries
|
|
|
|
|
# | ["NOT", query] Opposite of query
|
2020-02-13 14:59:19 +00:00
|
|
|
|
# query -> {key: value, …} A list of fields and values of a Q object
|
|
|
|
|
# key -> string A field name
|
|
|
|
|
# value -> int | string | bool | null Literal values
|
2020-03-20 14:58:14 +00:00
|
|
|
|
# | [parameter, …] A parameter. See compute_param for more details.
|
2020-03-07 12:12:17 +00:00
|
|
|
|
# | {"F": oper} An F object
|
2020-03-20 14:58:14 +00:00
|
|
|
|
# oper -> [string, …] A parameter. See compute_param for more details.
|
2020-03-07 12:12:17 +00:00
|
|
|
|
# | ["ADD", oper, …] Sum multiple F objects or literal
|
|
|
|
|
# | ["SUB", oper, oper] Substract two F objects or literal
|
|
|
|
|
# | ["MUL", oper, …] Multiply F objects or literals
|
2020-02-13 14:59:19 +00:00
|
|
|
|
# | int | string | bool | null Literal values
|
2020-03-07 12:12:17 +00:00
|
|
|
|
# | ["F", string] A field
|
2020-02-09 16:35:15 +00:00
|
|
|
|
#
|
|
|
|
|
# Examples:
|
2020-03-07 12:12:17 +00:00
|
|
|
|
# Q(is_superuser=True) := {"is_superuser": true}
|
|
|
|
|
# ~Q(is_superuser=True) := ["NOT", {"is_superuser": true}]
|
2020-02-09 17:14:36 +00:00
|
|
|
|
query = models.TextField()
|
2019-09-18 12:26:42 +00:00
|
|
|
|
|
2020-02-09 17:14:36 +00:00
|
|
|
|
type = models.CharField(max_length=15, choices=PERMISSION_TYPES)
|
2019-09-18 12:26:42 +00:00
|
|
|
|
|
2020-03-19 15:12:52 +00:00
|
|
|
|
mask = models.ForeignKey(
|
|
|
|
|
PermissionMask,
|
|
|
|
|
on_delete=models.PROTECT,
|
|
|
|
|
)
|
|
|
|
|
|
2020-02-09 17:14:36 +00:00
|
|
|
|
field = models.CharField(max_length=255, blank=True)
|
|
|
|
|
|
|
|
|
|
description = models.CharField(max_length=255, blank=True)
|
2019-09-18 12:26:42 +00:00
|
|
|
|
|
|
|
|
|
class Meta:
|
2020-02-09 17:14:36 +00:00
|
|
|
|
unique_together = ('model', 'query', 'type', 'field')
|
2019-09-18 12:26:42 +00:00
|
|
|
|
|
|
|
|
|
def clean(self):
|
2020-03-18 13:42:35 +00:00
|
|
|
|
self.query = json.dumps(json.loads(self.query))
|
2019-09-18 14:39:37 +00:00
|
|
|
|
if self.field and self.type not in {'view', 'change'}:
|
2019-09-18 12:26:42 +00:00
|
|
|
|
raise ValidationError(_("Specifying field applies only to view and change permission types."))
|
|
|
|
|
|
2020-03-07 12:12:17 +00:00
|
|
|
|
def save(self, **kwargs):
|
2019-09-18 12:26:42 +00:00
|
|
|
|
self.full_clean()
|
|
|
|
|
super().save()
|
|
|
|
|
|
2020-02-13 14:59:19 +00:00
|
|
|
|
@staticmethod
|
2020-03-07 12:12:17 +00:00
|
|
|
|
def compute_f(oper, **kwargs):
|
2020-02-13 14:59:19 +00:00
|
|
|
|
if isinstance(oper, list):
|
2020-03-18 13:42:35 +00:00
|
|
|
|
if oper[0] == 'ADD':
|
|
|
|
|
return functools.reduce(operator.add, [Permission.compute_f(oper, **kwargs) for oper in oper[1:]])
|
|
|
|
|
elif oper[0] == 'SUB':
|
|
|
|
|
return Permission.compute_f(oper[1], **kwargs) - Permission.compute_f(oper[2], **kwargs)
|
|
|
|
|
elif oper[0] == 'MUL':
|
|
|
|
|
return functools.reduce(operator.mul, [Permission.compute_f(oper, **kwargs) for oper in oper[1:]])
|
|
|
|
|
elif oper[0] == 'F':
|
|
|
|
|
return F(oper[1])
|
|
|
|
|
else:
|
|
|
|
|
field = kwargs[oper[0]]
|
|
|
|
|
for i in range(1, len(oper)):
|
|
|
|
|
field = getattr(field, oper[i])
|
|
|
|
|
return field
|
2020-02-13 14:59:19 +00:00
|
|
|
|
else:
|
|
|
|
|
return oper
|
2020-03-18 13:42:35 +00:00
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def compute_param(value, **kwargs):
|
2020-03-20 14:58:14 +00:00
|
|
|
|
"""
|
|
|
|
|
A parameter is given by a list. The first argument is the name of the parameter.
|
|
|
|
|
The parameters are the user, the club, and some classes (Note, ...)
|
|
|
|
|
If there are more arguments in the list, then attributes are queried.
|
|
|
|
|
For example, ["user", "note", "balance"] will return the balance of the note of the user.
|
|
|
|
|
If an argument is a list, then this is interpreted with a function call:
|
|
|
|
|
First argument is the name of the function, next arguments are parameters, and if there is a dict,
|
|
|
|
|
then the dict is given as kwargs.
|
|
|
|
|
For example: NoteUser.objects.filter(user__memberships__club__name="Kfet").all() is translated by:
|
|
|
|
|
["NoteUser", "objects", ["filter", {"user__memberships__club__name": "Kfet"}], ["all"]]
|
|
|
|
|
"""
|
|
|
|
|
|
2020-03-18 13:42:35 +00:00
|
|
|
|
if not isinstance(value, list):
|
|
|
|
|
return value
|
|
|
|
|
|
|
|
|
|
field = kwargs[value[0]]
|
|
|
|
|
for i in range(1, len(value)):
|
|
|
|
|
if isinstance(value[i], list):
|
2020-03-19 23:06:28 +00:00
|
|
|
|
if value[i][0] in kwargs:
|
|
|
|
|
field = Permission.compute_param(value[i], **kwargs)
|
|
|
|
|
continue
|
|
|
|
|
|
2020-03-18 13:42:35 +00:00
|
|
|
|
field = getattr(field, value[i][0])
|
|
|
|
|
params = []
|
|
|
|
|
call_kwargs = {}
|
|
|
|
|
for j in range(1, len(value[i])):
|
|
|
|
|
param = Permission.compute_param(value[i][j], **kwargs)
|
|
|
|
|
if isinstance(param, dict):
|
|
|
|
|
for key in param:
|
|
|
|
|
val = Permission.compute_param(param[key], **kwargs)
|
|
|
|
|
call_kwargs[key] = val
|
|
|
|
|
else:
|
|
|
|
|
params.append(param)
|
|
|
|
|
field = field(*params, **call_kwargs)
|
|
|
|
|
else:
|
|
|
|
|
field = getattr(field, value[i])
|
|
|
|
|
return field
|
2020-02-13 14:59:19 +00:00
|
|
|
|
|
2020-03-20 00:46:59 +00:00
|
|
|
|
@staticmethod
|
|
|
|
|
def _about(query, **kwargs):
|
2020-03-20 14:58:14 +00:00
|
|
|
|
"""
|
|
|
|
|
Translate JSON query into a Q query.
|
|
|
|
|
:param query: The JSON query
|
|
|
|
|
:param kwargs: Additional params
|
|
|
|
|
:return: A Q object
|
|
|
|
|
"""
|
2020-02-09 17:14:36 +00:00
|
|
|
|
if len(query) == 0:
|
|
|
|
|
# The query is either [] or {} and
|
2020-02-09 16:35:15 +00:00
|
|
|
|
# applies to all objects of the model
|
2020-03-18 13:42:35 +00:00
|
|
|
|
# to represent this we return a trivial request
|
|
|
|
|
return Q(pk=F("pk"))
|
2020-02-09 17:14:36 +00:00
|
|
|
|
if isinstance(query, list):
|
|
|
|
|
if query[0] == 'AND':
|
2020-03-20 00:46:59 +00:00
|
|
|
|
return functools.reduce(operator.and_, [Permission._about(query, **kwargs) for query in query[1:]])
|
2020-02-09 17:14:36 +00:00
|
|
|
|
elif query[0] == 'OR':
|
2020-03-20 00:46:59 +00:00
|
|
|
|
return functools.reduce(operator.or_, [Permission._about(query, **kwargs) for query in query[1:]])
|
2020-02-09 17:14:36 +00:00
|
|
|
|
elif query[0] == 'NOT':
|
2020-03-20 00:46:59 +00:00
|
|
|
|
return ~Permission._about(query[1], **kwargs)
|
2020-03-20 14:58:14 +00:00
|
|
|
|
else:
|
|
|
|
|
return Q(pk=F("pk"))
|
2020-02-09 17:14:36 +00:00
|
|
|
|
elif isinstance(query, dict):
|
2020-02-09 16:35:15 +00:00
|
|
|
|
q_kwargs = {}
|
2020-02-09 17:14:36 +00:00
|
|
|
|
for key in query:
|
|
|
|
|
value = query[key]
|
2020-02-09 16:35:15 +00:00
|
|
|
|
if isinstance(value, list):
|
2020-03-18 13:42:35 +00:00
|
|
|
|
# It is a parameter we query its return value
|
|
|
|
|
q_kwargs[key] = Permission.compute_param(value, **kwargs)
|
2020-02-13 14:59:19 +00:00
|
|
|
|
elif isinstance(value, dict):
|
|
|
|
|
# It is an F object
|
2020-03-18 13:42:35 +00:00
|
|
|
|
q_kwargs[key] = Permission.compute_f(value['F'], **kwargs)
|
2020-02-09 16:35:15 +00:00
|
|
|
|
else:
|
|
|
|
|
q_kwargs[key] = value
|
|
|
|
|
return Q(**q_kwargs)
|
2019-09-18 12:26:42 +00:00
|
|
|
|
else:
|
2020-02-09 16:35:15 +00:00
|
|
|
|
# TODO: find a better way to crash here
|
2020-03-20 00:46:59 +00:00
|
|
|
|
raise Exception("query {} is wrong".format(query))
|
2019-09-18 12:26:42 +00:00
|
|
|
|
|
|
|
|
|
def about(self, **kwargs):
|
2020-02-09 17:14:36 +00:00
|
|
|
|
"""
|
|
|
|
|
Return an InstancedPermission with the parameters
|
|
|
|
|
replaced by their values and the query interpreted
|
|
|
|
|
"""
|
|
|
|
|
query = json.loads(self.query)
|
2020-03-20 00:46:59 +00:00
|
|
|
|
# query = self._about(query, **kwargs)
|
|
|
|
|
return InstancedPermission(self.model, query, self.type, self.field, self.mask, **kwargs)
|
2019-09-18 12:26:42 +00:00
|
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
|
if self.field:
|
2020-02-09 17:14:36 +00:00
|
|
|
|
return _("Can {type} {model}.{field} in {query}").format(type=self.type, model=self.model, field=self.field, query=self.query)
|
2019-09-18 12:26:42 +00:00
|
|
|
|
else:
|
2020-02-09 17:14:36 +00:00
|
|
|
|
return _("Can {type} {model} in {query}").format(type=self.type, model=self.model, query=self.query)
|
2019-09-18 12:26:42 +00:00
|
|
|
|
|
2020-03-20 13:43:35 +00:00
|
|
|
|
|
|
|
|
|
class RolePermissions(models.Model):
|
|
|
|
|
"""
|
|
|
|
|
Permissions associated with a Role
|
|
|
|
|
"""
|
|
|
|
|
role = models.ForeignKey(
|
|
|
|
|
Role,
|
|
|
|
|
on_delete=models.PROTECT,
|
|
|
|
|
related_name='+',
|
|
|
|
|
verbose_name=_('role'),
|
|
|
|
|
)
|
|
|
|
|
permissions = models.ManyToManyField(
|
|
|
|
|
Permission,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
|
return str(self.role)
|