215 lines
6.0 KiB
Python
215 lines
6.0 KiB
Python
# -*- mode: python; coding: utf-8 -*-
|
|
# Copyright (C) 2017-2019 by BDE ENS Paris-Saclay
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
from datetime import datetime
|
|
|
|
import requests
|
|
from authlib.integrations.django_client import OAuth
|
|
from django.conf import settings
|
|
from django.contrib.auth.models import AbstractUser
|
|
from django.db import models
|
|
from django.db.models import Q
|
|
from django.utils import timezone
|
|
from django.utils.translation import gettext_lazy as _
|
|
from med.settings import MAX_EMPRUNT
|
|
|
|
|
|
class User(AbstractUser):
|
|
telephone = models.CharField(
|
|
verbose_name=_('phone number'),
|
|
max_length=15,
|
|
blank=True,
|
|
)
|
|
address = models.CharField(
|
|
verbose_name=_('address'),
|
|
max_length=255,
|
|
blank=True,
|
|
)
|
|
maxemprunt = models.IntegerField(
|
|
verbose_name=_('maximum borrowed'),
|
|
help_text=_('Maximal amount of simultaneous borrowed item '
|
|
'authorized.'),
|
|
default=MAX_EMPRUNT,
|
|
)
|
|
comment = models.CharField(
|
|
verbose_name=_('comment'),
|
|
help_text=_('Promotion...'),
|
|
max_length=255,
|
|
blank=True,
|
|
)
|
|
date_joined = models.DateTimeField(
|
|
_('date joined'),
|
|
default=timezone.now,
|
|
null=True,
|
|
)
|
|
|
|
REQUIRED_FIELDS = ['first_name', 'last_name', 'email']
|
|
|
|
@property
|
|
def is_member(self):
|
|
"""
|
|
Return True if user is member of the club.
|
|
"""
|
|
return Membership.objects.filter(
|
|
user=self,
|
|
date_start__lte=timezone.now(),
|
|
date_end__gte=timezone.now()).exists()
|
|
|
|
def update_data(self, data: dict):
|
|
"""
|
|
Update user data from given dictionary.
|
|
Useful when we want to update user data from Note Kfet.
|
|
|
|
Parameters
|
|
----------
|
|
data : dict
|
|
Dictionary with user data to update.
|
|
"""
|
|
self.email = data['email']
|
|
self.first_name = data['first_name']
|
|
self.last_name = data['last_name']
|
|
self.telephone = data['profile']['phone_number']
|
|
self.address = data['profile']['address']
|
|
self.comment = data['profile']['section']
|
|
|
|
for membership_dict in data['memberships']:
|
|
if membership_dict['club'] != 22: # Med
|
|
continue
|
|
# Add membership if not exists
|
|
Membership.objects.get_or_create(
|
|
user=self,
|
|
date_start=membership_dict['date_start'],
|
|
date_end=membership_dict['date_end'],
|
|
)
|
|
|
|
# Only members or old members are allow to connect to the website
|
|
self.is_active = Membership.objects.filter(user=self).exists()
|
|
|
|
|
|
class Membership(models.Model):
|
|
user = models.ForeignKey(
|
|
User,
|
|
on_delete=models.CASCADE,
|
|
verbose_name=_('user'),
|
|
)
|
|
|
|
date_start = models.DateField(
|
|
auto_now_add=True,
|
|
verbose_name=_('start date'),
|
|
)
|
|
|
|
date_end = models.DateField(
|
|
auto_now_add=True,
|
|
verbose_name=_('start date'),
|
|
)
|
|
|
|
class Meta:
|
|
verbose_name = _('membership')
|
|
verbose_name_plural = _('memberships')
|
|
|
|
|
|
class AccessToken(models.Model):
|
|
owner = models.ForeignKey(
|
|
settings.AUTH_USER_MODEL,
|
|
on_delete=models.CASCADE,
|
|
null=True,
|
|
default=None,
|
|
verbose_name=_('owner'),
|
|
)
|
|
|
|
access_token = models.CharField(
|
|
max_length=32,
|
|
verbose_name=_('access token'),
|
|
)
|
|
|
|
expires_in = models.PositiveSmallIntegerField(
|
|
verbose_name=_('expires in'),
|
|
)
|
|
|
|
scopes = models.CharField(
|
|
max_length=255,
|
|
verbose_name=_('scopes'),
|
|
)
|
|
|
|
refresh_token = models.CharField(
|
|
max_length=32,
|
|
verbose_name=_('refresh token'),
|
|
)
|
|
|
|
expires_at = models.DateTimeField(
|
|
verbose_name=_('expires at'),
|
|
)
|
|
|
|
def refresh(self):
|
|
"""
|
|
Refresh the access token.
|
|
"""
|
|
oauth = OAuth()
|
|
oauth.register('notekfet')
|
|
# Get the OAuth client
|
|
oauth_client = oauth.notekfet._get_oauth_client()
|
|
# Actually refresh the token
|
|
token = oauth_client.refresh_token(oauth.notekfet.access_token_url,
|
|
refresh_token=self.refresh_token)
|
|
self.access_token = token['access_token']
|
|
self.expires_in = token['expires_in']
|
|
self.scopes = token['scope']
|
|
self.refresh_token = token['refresh_token']
|
|
self.expires_at = timezone.utc.fromutc(
|
|
datetime.fromtimestamp(token['expires_at'])
|
|
)
|
|
|
|
self.save()
|
|
|
|
def refresh_if_expired(self):
|
|
"""
|
|
Refresh the current token if it is invalid.
|
|
"""
|
|
if self.expires_at < timezone.now():
|
|
self.refresh()
|
|
|
|
def auth_header(self):
|
|
"""
|
|
Return HTTP header that contains the bearer access token.
|
|
Refresh the token if needed.
|
|
"""
|
|
self.refresh_if_expired()
|
|
return {'Authorization': f'Bearer {self.access_token}'}
|
|
|
|
def fetch_user(self, create_if_not_exist: bool = False):
|
|
"""
|
|
Extract information about the Note Kfet API by using the current
|
|
access token.
|
|
"""
|
|
data = requests.get('https://note-dev.crans.org/api/me/',
|
|
headers=self.auth_header()).json()
|
|
username = data['username']
|
|
email = data['email']
|
|
qs = User.objects.filter(Q(username=username) | Q(email=email))
|
|
if not qs.exists():
|
|
if create_if_not_exist:
|
|
user = User.objects.create(username=username, email=email)
|
|
else:
|
|
return None
|
|
else:
|
|
user = qs.get()
|
|
|
|
# Update user data from Note Kfet
|
|
user.update_data(data)
|
|
user.save()
|
|
|
|
# Store token owner
|
|
self.owner = user
|
|
self.save()
|
|
|
|
return user
|
|
|
|
@classmethod
|
|
def get_token(cls, request):
|
|
return AccessToken.objects.get(pk=request.session['access_token_id'])
|
|
|
|
class Meta:
|
|
verbose_name = _('access token')
|
|
verbose_name_plural = _('access tokens')
|