# Copyright (C) 2018-2025 by BDE ENS Paris-Saclay # SPDX-License-Identifier: GPL-3.0-or-later import base64 import hashlib from django.contrib.auth.hashers import PBKDF2PasswordHasher from django.contrib.auth.models import User from django.utils.crypto import get_random_string from django.test import TestCase from member.models import Membership, Club from note.models import NoteUser from oauth2_provider.models import Application, AccessToken, Grant from ..models import Role, Permission class OAuth2FlowTestCase(TestCase): fixtures = ('initial', ) def setUp(self): self.user_password = "toto1234" hasher = PBKDF2PasswordHasher() self.user = User.objects.create( username="toto", password=hasher.encode(self.user_password, hasher.salt()), ) NoteUser.objects.create(user=self.user) membership = Membership.objects.create(user=self.user, club_id=1) membership.roles.add(Role.objects.get(name="Adhérent⋅e BDE")) membership.save() bde = Club.objects.get(name="BDE") view_user_perm = Permission.objects.get(pk=1) # View own user detail self.base_scope = f'{view_user_perm.pk}_{bde.pk}' def test_oauth2_authorization_code_flow(self): """ Ensure OAuth2 Authorization Code Flow work """ app = Application.objects.create( name="Test Authorization Code", client_type=Application.CLIENT_CONFIDENTIAL, authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE, user=self.user, hash_client_secret=False, redirect_uris='http://127.0.0.1:8000/noexist/callback', algorithm=Application.NO_ALGORITHM, ) credential = base64.b64encode(f'{app.client_id}:{app.client_secret}'.encode('utf-8')).decode() ############################ # Minimal RFC6749 requests # ############################ resp = self.client.get('/o/authorize/', data={"response_type": "code", # REQUIRED "client_id": app.client_id}, # REQUIRED **{"Content-Type": 'application/x-www-form-urlencoded'}) # Get user authorization ################################################################################## url = resp.url csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0] resp = self.client.post(url, data={"username": self.user.username, "password": self.user_password, "permission_mask": 1, "csrfmiddlewaretoken": csrf_token}) url = resp.url resp = self.client.get(url) csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0] resp = self.client.post(url, follow=True, data={"allow": "Authorize", "scope": "0_0", "csrfmiddlewaretoken": csrf_token, "response_type": "code", "client_id": app.client_id, "redirect_uri": app.redirect_uris}) keys = resp.request['QUERY_STRING'].split("&") for key in keys: if len(key.split('code=')) == 2: code = key.split('code=')[1] ################################################################################## grant = Grant.objects.get(code=code) self.assertEqual(grant.scope, '0_0') # Now we can ask an Access Token resp = self.client.post('/o/token/', data={"grant_type": 'authorization_code', # REQUIRED "code": code}, # REQUIRED **{"Content-Type": 'application/x-www-form-urlencoded', "HTTP_Authorization": f'Basic {credential}'}) # We should have refresh token self.assertEqual('refresh_token' in resp.json(), True) token = AccessToken.objects.get(token=resp.json()['access_token']) # Token do nothing, it should be have the useless scope self.assertEqual(token.scope, '0_0') # Logout user self.client.logout() ############################################# # Maximal RFC6749 + RFC7636 (PKCE) requests # ############################################# state = get_random_string(32) # PKCE code_verifier = get_random_string(100) # 43-128 characters [A-Z,a-z,0-9,"-",".","_","~"] code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest() code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8').replace('=', '') cc_method = "S256" resp = self.client.get('/o/authorize/', data={"response_type": "code", # REQUIRED "code_challenge": code_challenge, # PKCE REQUIRED "code_challenge_method": cc_method, # PKCE REQUIRED "client_id": app.client_id, # REQUIRED "redirect_uri": app.redirect_uris, # OPTIONAL "scope": self.base_scope, # OPTIONAL "state": state}, # RECOMMENDED **{"Content-Type": 'application/x-www-form-urlencoded'}) # Get user authorization ################################################################################## url = resp.url csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0] resp = self.client.post(url, data={"username": self.user.username, "password": self.user_password, "permission_mask": 1, "csrfmiddlewaretoken": csrf_token}) url = resp.url resp = self.client.get(url) csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0] resp = self.client.post(url, follow=True, data={"allow": "Authorize", "scope": self.base_scope, "csrfmiddlewaretoken": csrf_token, "response_type": "code", "code_challenge": code_challenge, "code_challenge_method": cc_method, "client_id": app.client_id, "state": state, "redirect_uri": app.redirect_uris}) keys = resp.request['QUERY_STRING'].split("&") for key in keys: if len(key.split('code=')) == 2: code = key.split('code=')[1] if len(key.split('state=')) == 2: resp_state = key.split('state=')[1] ################################################################################## grant = Grant.objects.get(code=code) self.assertEqual(grant.scope, self.base_scope) self.assertEqual(state, resp_state) # Now we can ask an Access Token resp = self.client.post('/o/token/', data={"grant_type": 'authorization_code', # REQUIRED "code": code, # REQUIRED "code_verifier": code_verifier, # PKCE REQUIRED "redirect_uri": app.redirect_uris}, # REQUIRED **{"Content-Type": 'application/x-www-form-urlencoded', "HTTP_Authorization": f'Basic {credential}'}) # We should have refresh token self.assertEqual('refresh_token' in resp.json(), True) token = AccessToken.objects.get(token=resp.json()['access_token']) # Token can have access, it shouldn't have the useless scope self.assertEqual(token.scope, self.base_scope) def test_oauth2_implicit_flow(self): """ Ensure OAuth2 Implicit Flow work """ app = Application.objects.create( name="Test Implicit Flow", client_type=Application.CLIENT_CONFIDENTIAL, authorization_grant_type=Application.GRANT_IMPLICIT, user=self.user, hash_client_secret=False, algorithm=Application.NO_ALGORITHM, redirect_uris='http://127.0.0.1:8000/noexist/callback/', ) ############################ # Minimal RFC6749 requests # ############################ resp = self.client.get('/o/authorize/', data={'response_type': 'token', # REQUIRED 'client_id': app.client_id}, # REQUIRED **{"Content-Type": 'application/x-www-form-urlencoded'} ) # Get user authorization ################################################################################## url = resp.url csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0] resp = self.client.post(url, data={"username": self.user.username, "password": self.user_password, "permission_mask": 1, "csrfmiddlewaretoken": csrf_token}) url = resp.url resp = self.client.get(url) csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0] resp = self.client.post(url, follow=True, data={"allow": "Authorize", "scope": '0_0', "csrfmiddlewaretoken": csrf_token, "response_type": "token", "client_id": app.client_id, "redirect_uri": app.redirect_uris}) url = resp.redirect_chain[0][0] keys = url.split('#')[1] refresh_token = '' for couple in keys.split('&'): if couple.split('=')[0] == 'access_token': token = couple.split('=')[1] if couple.split('=')[0] == 'refresh_token': refresh_token = couple.split('=')[1] ################################################################################## self.assertEqual(refresh_token, '') access_token = AccessToken.objects.get(token=token) # Token do nothing, it should be have the useless scope self.assertEqual(access_token.scope, '0_0') # Logout user self.client.logout() ############################ # Maximal RFC6749 requests # ############################ state = get_random_string(32) resp = self.client.get('/o/authorize/', data={'response_type': 'token', # REQUIRED 'client_id': app.client_id, # REQUIRED 'redirect_uri': app.redirect_uris, # OPTIONAL 'scope': self.base_scope, # OPTIONAL 'state': state}, # RECOMMENDED **{"Content-Type": 'application/x-www-form-urlencoded'} ) # Get user authorization ################################################################################## url = resp.url csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0] resp = self.client.post(url, data={"username": self.user.username, "password": self.user_password, "permission_mask": 1, "csrfmiddlewaretoken": csrf_token}) url = resp.url resp = self.client.get(url) csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0] resp = self.client.post(url, follow=True, data={"allow": "Authorize", "scope": self.base_scope, "state": state, "csrfmiddlewaretoken": csrf_token, "response_type": "token", "client_id": app.client_id, "redirect_uri": app.redirect_uris}) url = resp.redirect_chain[0][0] keys = url.split('#')[1] refresh_token = '' for couple in keys.split('&'): if couple.split('=')[0] == 'access_token': token = couple.split('=')[1] if couple.split('=')[0] == 'refresh_token': refresh_token = couple.split('=')[1] if couple.split('=')[0] == 'state': resp_state = couple.split('=')[1] ################################################################################## self.assertEqual(refresh_token, '') access_token = AccessToken.objects.get(token=token) # Token can have access, it shouldn't have the useless scope self.assertEqual(access_token.scope, self.base_scope) self.assertEqual(state, resp_state) def test_oauth2_resource_owner_password_credentials_flow(self): """ Ensure OAuth2 Resource Owner Password Credentials Flow work """ app = Application.objects.create( name="Test ROPB", client_type=Application.CLIENT_CONFIDENTIAL, authorization_grant_type=Application.GRANT_PASSWORD, user=self.user, hash_client_secret=False, algorithm=Application.NO_ALGORITHM, ) credential = base64.b64encode(f'{app.client_id}:{app.client_secret}'.encode('utf-8')).decode() # No token without real password resp = self.client.post('/o/token/', data={"grant_type": "password", # REQUIRED "username": self.user, # REQUIRED "password": "password"}, # REQUIRED **{"Content-Type": 'application/x-www-form-urlencoded', "Http_Authorization": f'Basic {credential}'} ) self.assertEqual(resp.status_code, 400) resp = self.client.post('/o/token/', data={"grant_type": "password", # REQUIRED "username": self.user, # REQUIRED "password": self.user_password}, # REQUIRED **{"Content-Type": 'application/x-www-form-urlencoded', "HTTP_Authorization": f'Basic {credential}'} ) self.assertEqual(resp.status_code, 200) access_token = AccessToken.objects.get(token=resp.json()['access_token']) self.assertEqual('refresh_token' in resp.json(), True) self.assertEqual(access_token.scope, '0_0') # token do nothing # RFC6749 4.3.2 allows use of scope in ROPB token access request resp = self.client.post('/o/token/', data={"grant_type": "password", # REQUIRED "username": self.user, # REQUIRED "password": self.user_password, # REQUIRED "scope": self.base_scope}, # OPTIONAL **{"Content-Type": 'application/x-www-form-urlencoded', "HTTP_Authorization": f'Basic {credential}'} ) token = AccessToken.objects.get(token=resp.json()['access_token']) self.assertEqual(token.scope, self.base_scope) # token do nothing more than base_scope def test_oauth2_client_credentials(self): """ Ensure OAuth2 Client Credentials work """ app = Application.objects.create( name="Test client_credentials", client_type=Application.CLIENT_CONFIDENTIAL, authorization_grant_type=Application.GRANT_CLIENT_CREDENTIALS, user=self.user, hash_client_secret=False, algorithm=Application.NO_ALGORITHM, ) # No token without credential resp = self.client.post('/o/token/', data={"grant_type": "client_credentials"}, # REQUIRED **{"Content-Type": 'application/x-www-form-urlencoded'} ) self.assertEqual(resp.status_code, 401) # Access with credential credential = base64.b64encode(f'{app.client_id}:{app.client_secret}'.encode('utf-8')).decode() resp = self.client.post('/o/token/', data={"grant_type": "client_credentials"}, # REQUIRED **{'HTTP_Authorization': f'Basic {credential}', "Content-Type": 'application/x-www-form-urlencoded'} ) self.assertEqual(resp.status_code, 200) token = AccessToken.objects.get(token=resp.json()['access_token']) # Token do nothing, it should be have the useless scope self.assertEqual(token.scope, '0_0') # RFC6749 4.4.2 allows use of scope in client credential flow resp = self.client.post('/o/token/', data={"grant_type": "client_credentials", # REQUIRED "scope": self.base_scope}, # OPTIONAL **{'http_Authorization': f'Basic {credential}', "Content-Type": 'application/x-www-form-urlencoded'} ) self.assertEqual(resp.status_code, 200) token = AccessToken.objects.get(token=resp.json()['access_token']) # Token can have access, it shouldn't have the useless scope self.assertEqual(token.scope, self.base_scope)