# -*- mode: python; coding: utf-8 -*- # Copyright (C) 2017-2021 by BDE ENS Paris-Saclay # SPDX-License-Identifier: GPL-3.0-or-later from datetime import datetime import requests from authlib.integrations.django_client import OAuth from django.conf import settings from django.contrib.auth.models import AbstractUser from django.db import models from django.db.models import Q from django.utils import timezone from django.utils.translation import gettext_lazy as _ class User(AbstractUser): phone_number = models.CharField( verbose_name=_('phone number'), max_length=15, blank=True, ) address = models.CharField( verbose_name=_('address'), max_length=255, blank=True, ) comment = models.CharField( verbose_name=_('comment'), help_text=_('Promotion...'), max_length=255, blank=True, ) date_joined = models.DateTimeField( verbose_name=_('date joined'), default=timezone.now, null=True, ) REQUIRED_FIELDS = ['first_name', 'last_name', 'email'] @property def is_member(self): """ Return True if user is member of the club. """ return Membership.objects.filter( user=self, date_start__lte=timezone.now(), date_end__gte=timezone.now()).exists() def update_data(self, data: dict): """ Update user data from given dictionary. Useful when we want to update user data from Note Kfet. Parameters ---------- data : dict Dictionary with user data to update. """ self.email = data['email'] self.first_name = data['first_name'] self.last_name = data['last_name'] self.phone_number = data['profile']['phone_number'] self.address = data['profile']['address'] self.comment = data['profile']['section'] for membership_dict in data['memberships']: if membership_dict['club'] != 22: # Med continue # Add membership if not exists Membership.objects.get_or_create( user=self, date_start=membership_dict['date_start'], date_end=membership_dict['date_end'], ) # Only members or old members are allow to connect to the website self.is_active = Membership.objects.filter(user=self).exists() class Membership(models.Model): user = models.ForeignKey( User, on_delete=models.CASCADE, verbose_name=_('user'), ) date_start = models.DateField( auto_now_add=True, verbose_name=_('start date'), ) date_end = models.DateField( auto_now_add=True, verbose_name=_('start date'), ) def __str__(self): return f'{self.user}: {self.date_start} to {self.date_end}' class Meta: verbose_name = _('membership') verbose_name_plural = _('memberships') class AccessToken(models.Model): owner = models.ForeignKey( settings.AUTH_USER_MODEL, on_delete=models.CASCADE, null=True, default=None, verbose_name=_('owner'), ) access_token = models.CharField( max_length=32, verbose_name=_('access token'), ) expires_in = models.PositiveSmallIntegerField( verbose_name=_('expires in'), ) scopes = models.CharField( max_length=255, verbose_name=_('scopes'), ) refresh_token = models.CharField( max_length=32, verbose_name=_('refresh token'), ) expires_at = models.DateTimeField( verbose_name=_('expires at'), ) def refresh(self): """ Refresh the access token. """ oauth = OAuth() oauth.register('notekfet') # Get the OAuth client oauth_client = oauth.notekfet._get_oauth_client() # Actually refresh the token token = oauth_client.refresh_token(oauth.notekfet.access_token_url, refresh_token=self.refresh_token) self.access_token = token['access_token'] self.expires_in = token['expires_in'] self.scopes = token['scope'] self.refresh_token = token['refresh_token'] self.expires_at = timezone.utc.fromutc( datetime.fromtimestamp(token['expires_at']) ) self.save() def refresh_if_expired(self): """ Refresh the current token if it is invalid. """ if self.expires_at < timezone.now(): self.refresh() def auth_header(self): """ Return HTTP header that contains the bearer access token. Refresh the token if needed. """ self.refresh_if_expired() return {'Authorization': f'Bearer {self.access_token}'} def fetch_user(self, create_if_not_exist: bool = False): """ Extract information about the Note Kfet API by using the current access token. """ data = requests.get(f'{settings.NOTE_KFET_URL}/api/me/', headers=self.auth_header()).json() username = data['username'] email = data['email'] qs = User.objects.filter(Q(username=username) | Q(email=email)) if not qs.exists(): if create_if_not_exist: user = User.objects.create(username=username, email=email) else: return None else: user = qs.get() # Update user data from Note Kfet user.update_data(data) user.save() # Store token owner self.owner = user self.save() return user @classmethod def get_token(cls, request): return AccessToken.objects.get(pk=request.session['access_token_id']) def __str__(self): return self.access_token class Meta: verbose_name = _('access token') verbose_name_plural = _('access tokens')