1
0
mirror of https://gitlab.crans.org/mediatek/med.git synced 2024-11-26 16:07:09 +00:00
med/users/models.py

212 lines
5.9 KiB
Python
Raw Normal View History

2019-08-02 12:57:53 +00:00
# -*- mode: python; coding: utf-8 -*-
2021-11-14 13:26:41 +00:00
# Copyright (C) 2017-2021 by BDE ENS Paris-Saclay
2019-08-02 12:57:53 +00:00
# SPDX-License-Identifier: GPL-3.0-or-later
2021-11-04 10:29:03 +00:00
from datetime import datetime
import requests
2021-11-04 10:29:03 +00:00
from authlib.integrations.django_client import OAuth
from django.conf import settings
2019-08-08 10:16:40 +00:00
from django.contrib.auth.models import AbstractUser
2019-08-02 12:57:53 +00:00
from django.db import models
from django.db.models import Q
from django.utils import timezone
2019-08-02 12:57:53 +00:00
from django.utils.translation import gettext_lazy as _
2019-08-08 10:16:40 +00:00
class User(AbstractUser):
2021-11-14 13:26:41 +00:00
phone_number = models.CharField(
2019-08-08 10:16:40 +00:00
verbose_name=_('phone number'),
max_length=15,
blank=True,
2019-08-02 19:35:30 +00:00
)
2019-08-08 10:16:40 +00:00
address = models.CharField(
verbose_name=_('address'),
max_length=255,
blank=True,
2019-08-02 16:37:54 +00:00
)
2019-08-08 10:16:40 +00:00
comment = models.CharField(
verbose_name=_('comment'),
help_text=_('Promotion...'),
max_length=255,
blank=True,
2019-08-02 16:37:54 +00:00
)
date_joined = models.DateTimeField(
2021-11-14 13:26:41 +00:00
verbose_name=_('date joined'),
default=timezone.now,
null=True,
)
2019-08-08 10:16:40 +00:00
REQUIRED_FIELDS = ['first_name', 'last_name', 'email']
@property
2019-08-10 14:22:04 +00:00
def is_member(self):
2021-11-04 13:23:03 +00:00
"""
Return True if user is member of the club.
"""
return Membership.objects.filter(
user=self,
date_start__lte=timezone.now(),
date_end__gte=timezone.now()).exists()
2021-11-04 10:29:03 +00:00
def update_data(self, data: dict):
"""
Update user data from given dictionary.
Useful when we want to update user data from Note Kfet.
Parameters
----------
data : dict
Dictionary with user data to update.
"""
self.email = data['email'] or ''
self.first_name = data['first_name'] or ''
self.last_name = data['last_name'] or ''
self.phone_number = data['profile']['phone_number'] or ''
self.address = data['profile']['address'] or ''
self.comment = data['profile']['section'] or ''
2021-11-04 13:23:03 +00:00
for membership_dict in data['memberships']:
if membership_dict['club'] != 22: # Med
continue
# Add membership if not exists
Membership.objects.get_or_create(
user=self,
date_start=membership_dict['date_start'],
date_end=membership_dict['date_end'],
)
# Only members or old members are allow to connect to the website
self.is_active = Membership.objects.filter(user=self).exists()
2021-11-04 13:23:03 +00:00
class Membership(models.Model):
user = models.ForeignKey(
User,
on_delete=models.CASCADE,
verbose_name=_('user'),
)
date_start = models.DateField(
verbose_name=_('start date'),
)
date_end = models.DateField(
verbose_name=_('start date'),
)
2021-11-14 13:28:13 +00:00
def __str__(self):
return f'{self.user}: {self.date_start} to {self.date_end}'
2021-11-04 13:23:03 +00:00
class Meta:
verbose_name = _('membership')
verbose_name_plural = _('memberships')
2021-11-04 10:29:03 +00:00
class AccessToken(models.Model):
owner = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.CASCADE,
null=True,
default=None,
verbose_name=_('owner'),
)
access_token = models.CharField(
max_length=32,
verbose_name=_('access token'),
)
2021-11-14 14:49:58 +00:00
expires_in = models.PositiveIntegerField(
2021-11-04 10:29:03 +00:00
verbose_name=_('expires in'),
)
scopes = models.CharField(
max_length=255,
verbose_name=_('scopes'),
)
refresh_token = models.CharField(
max_length=32,
verbose_name=_('refresh token'),
)
expires_at = models.DateTimeField(
verbose_name=_('expires at'),
)
def refresh(self):
"""
Refresh the access token.
"""
oauth = OAuth()
oauth.register('notekfet')
# Get the OAuth client
oauth_client = oauth.notekfet._get_oauth_client()
# Actually refresh the token
token = oauth_client.refresh_token(oauth.notekfet.access_token_url,
refresh_token=self.refresh_token)
self.access_token = token['access_token']
self.expires_in = token['expires_in']
self.scopes = token['scope']
self.refresh_token = token['refresh_token']
self.expires_at = timezone.utc.fromutc(
datetime.fromtimestamp(token['expires_at'])
)
self.save()
def refresh_if_expired(self):
"""
Refresh the current token if it is invalid.
"""
if self.expires_at < timezone.now():
self.refresh()
def auth_header(self):
"""
Return HTTP header that contains the bearer access token.
Refresh the token if needed.
"""
self.refresh_if_expired()
return {'Authorization': f'Bearer {self.access_token}'}
def fetch_user(self, create_if_not_exist: bool = False):
"""
Extract information about the Note Kfet API by using the current
access token.
"""
2021-11-14 14:17:55 +00:00
data = requests.get(f'{settings.NOTE_KFET_URL}/api/me/',
headers=self.auth_header()).json()
username = data['username']
email = data['email']
qs = User.objects.filter(Q(username=username) | Q(email=email))
if not qs.exists():
if create_if_not_exist:
user = User.objects.create(username=username, email=email)
else:
return None
else:
user = qs.get()
# Update user data from Note Kfet
user.update_data(data)
user.save()
2021-11-04 13:23:03 +00:00
# Store token owner
self.owner = user
self.save()
return user
2021-11-04 13:25:35 +00:00
@classmethod
def get_token(cls, request):
return AccessToken.objects.get(pk=request.session['access_token_id'])
2021-11-14 13:28:13 +00:00
def __str__(self):
return self.access_token
2021-11-04 10:29:03 +00:00
class Meta:
verbose_name = _('access token')
verbose_name_plural = _('access tokens')