diff --git a/apps/permission/scopes.py b/apps/permission/scopes.py index 266af662..cece9a21 100644 --- a/apps/permission/scopes.py +++ b/apps/permission/scopes.py @@ -54,7 +54,7 @@ class PermissionScopes(BaseScopes): return [] scopes = [f"{p.id}_{p.membership.club.id}" for p in PermissionBackend.get_raw_permissions(get_current_request(), 'view')] - scopes.append('0_0') + scopes = ['0_0'] return scopes @@ -138,7 +138,29 @@ class PermissionOAuth2Validator(OAuth2Validator): request.scopes = valid_scopes return valid_scopes + def validate_code_scopes(self, client_id, scopes, client, request, *args, **kwargs): + """ + For Authorization Code scope are scope of the user + """ + valid_scopes = set() + req = get_current_request() + request.oauth2 = {} + request.oauth2['user'] = req.user + request.oauth2['scope'] = scopes + # mask implementation + request.oauth2['mask'] = req.session.load()['permission_mask'] + for t in Permission.PERMISSION_TYPES: + for p in PermissionBackend.get_raw_permissions(request, t[0]): + scope = f"{p.id}_{p.membership.club.id}" + if scope in scopes: + valid_scopes.add(scope) + + # Always give one scope to generate token + if not valid_scopes: + valid_scopes.add('0_0') + request.scopes = valid_scopes + return valid_scopes def validate_scopes(self, client_id, scopes, client, request, *args, **kwargs): """ @@ -148,13 +170,14 @@ class PermissionOAuth2Validator(OAuth2Validator): This allows clients to request more permission to get finally a subset of permissions. """ - valid_scopes = set() if hasattr(request, 'grant_type') and request.grant_type == 'client_credentials': return self.validate_client_credentials_scopes(client_id, scopes, client, request, args, kwargs) if hasattr(request, 'grant_type') and request.grant_type == 'password': return self.validate_ropb_scopes(client_id, scopes, client, request, args, kwargs) + if hasattr(request, '_params') and request._params['response_type'] == 'code': + return self.validate_code_scopes(client_id, scopes, client, request, args, kwargs) for t in Permission.PERMISSION_TYPES: for p in PermissionBackend.get_raw_permissions(get_current_request(), t[0]): diff --git a/apps/permission/tests/test_oauth2_flow.py b/apps/permission/tests/test_oauth2_flow.py index 4a026b50..8fc6c71e 100644 --- a/apps/permission/tests/test_oauth2_flow.py +++ b/apps/permission/tests/test_oauth2_flow.py @@ -2,13 +2,15 @@ # SPDX-License-Identifier: GPL-3.0-or-later import base64 +import hashlib from django.contrib.auth.hashers import PBKDF2PasswordHasher from django.contrib.auth.models import User +from django.utils.crypto import get_random_string from django.test import TestCase from member.models import Membership, Club from note.models import NoteUser -from oauth2_provider.models import Application, AccessToken +from oauth2_provider.models import Application, AccessToken, Grant from ..models import Role, Permission @@ -19,14 +21,12 @@ class OAuth2FlowTestCase(TestCase): def setUp(self): self.user_password = "toto1234" hasher = PBKDF2PasswordHasher() - + self.user = User.objects.create( username="toto", password=hasher.encode(self.user_password, hasher.salt()), ) - - NoteUser.objects.create(user=self.user) membership = Membership.objects.create(user=self.user, club_id=1) membership.roles.add(Role.objects.get(name="Adhérent⋅e BDE")) @@ -41,7 +41,164 @@ class OAuth2FlowTestCase(TestCase): """ Ensure OAuth2 Authorization Code Flow work """ - pass + + app = Application.objects.create( + name="Test Authorization Code", + client_type=Application.CLIENT_CONFIDENTIAL, + authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE, + user=self.user, + hash_client_secret=False, + redirect_uris='http://127.0.0.1:8000/noexist/callback', + algorithm=Application.NO_ALGORITHM, + ) + + credential = base64.b64encode(f'{app.client_id}:{app.client_secret}'.encode('utf-8')).decode() + + ############################ + # Minimal RFC6749 requests # + ############################ + + resp = self.client.get('/o/authorize/', + data={"response_type": "code", # REQUIRED + "client_id": app.client_id}, # REQUIRED + **{"Content-Type": 'application/x-www-form-urlencoded'}) + + # Get user authorization + + ################################################################################## + + url = resp.url + csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0] + + resp = self.client.post(url, + data={"username": self.user.username, + "password": self.user_password, + "permission_mask": 1, + "csrfmiddlewaretoken": csrf_token}) + + url = resp.url + resp = self.client.get(url) + + csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0] + + resp = self.client.post(url, + follow=True, + data={"allow": "Authorize", + "scope": "0_0", + "csrfmiddlewaretoken": csrf_token, + "response_type": "code", + "client_id": app.client_id, + "redirect_uri": app.redirect_uris}) + + keys = resp.request['QUERY_STRING'].split("&") + for key in keys: + if len(key.split('code=')) == 2: + code = key.split('code=')[1] + + ################################################################################## + + grant = Grant.objects.get(code=code) + self.assertEqual(grant.scope, '0_0') + + # Now we can ask an Access Token + + resp = self.client.post('/o/token/', + data={"grant_type": 'authorization_code', # REQUIRED + "code": code}, # REQUIRED + **{"Content-Type": 'application/x-www-form-urlencoded', + "HTTP_Authorization": f'Basic {credential}'}) + + # We should have refresh token + self.assertEqual('refresh_token' in resp.json(), True) + + token = AccessToken.objects.get(token=resp.json()['access_token']) + + # Token do nothing, it should be have the useless scope + self.assertEqual(token.scope, '0_0') + + # Logout user + self.client.logout() + + ############################################# + # Maximal RFC6749 + RFC7636 (PKCE) requests # + ############################################# + + state = get_random_string(32) + + # PKCE + code_verifier = get_random_string(100) # 43-128 characters [A-Z,a-z,0-9,"-",".","_","~"] + code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest() + code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8').replace('=', '') + cc_method = "S256" + + resp = self.client.get('/o/authorize/', + data={"response_type": "code", # REQUIRED + "code_challenge": code_challenge, # PKCE REQUIRED + "code_challenge_method": cc_method, # PKCE REQUIRED + "client_id": app.client_id, # REQUIRED + "redirect_uri": app.redirect_uris, # OPTIONAL + "scope": self.base_scope, # OPTIONAL + "state": state}, # RECOMMENDED + **{"Content-Type": 'application/x-www-form-urlencoded'}) + + # Get user authorization + ################################################################################## + url = resp.url + csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0] + + resp = self.client.post(url, + data={"username": self.user.username, + "password": self.user_password, + "permission_mask": 1, + "csrfmiddlewaretoken": csrf_token}) + + url = resp.url + resp = self.client.get(url) + + csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0] + + resp = self.client.post(url, + follow=True, + data={"allow": "Authorize", + "scope": self.base_scope, + "csrfmiddlewaretoken": csrf_token, + "response_type": "code", + "code_challenge": code_challenge, + "code_challenge_method": cc_method, + "client_id": app.client_id, + "state": state, + "redirect_uri": app.redirect_uris}) + + keys = resp.request['QUERY_STRING'].split("&") + for key in keys: + if len(key.split('code=')) == 2: + code = key.split('code=')[1] + if len(key.split('state=')) == 2: + resp_state = key.split('state=')[1] + + ################################################################################## + + grant = Grant.objects.get(code=code) + self.assertEqual(grant.scope, self.base_scope) + self.assertEqual(state, resp_state) + + # Now we can ask an Access Token + + resp = self.client.post('/o/token/', + data={"grant_type": 'authorization_code', # REQUIRED + "code": code, # REQUIRED + "code_verifier": code_verifier, # PKCE REQUIRED + "redirect_uri": app.redirect_uris}, # REQUIRED + **{"Content-Type": 'application/x-www-form-urlencoded', + "HTTP_Authorization": f'Basic {credential}'}) + + # We should have refresh token + self.assertEqual('refresh_token' in resp.json(), True) + + token = AccessToken.objects.get(token=resp.json()['access_token']) + + # Token can have access, it shouldn't have the useless scope + self.assertEqual(token.scope, self.base_scope) def test_oauth2_implicit_flow(self): """ @@ -61,14 +218,14 @@ class OAuth2FlowTestCase(TestCase): hash_client_secret=False, algorithm=Application.NO_ALGORITHM, ) - + credential = base64.b64encode(f'{app.client_id}:{app.client_secret}'.encode('utf-8')).decode() # No token without real password resp = self.client.post('/o/token/', - data={"grant_type": "password", - "username": self.user, - "password": "password"}, + data={"grant_type": "password", # REQUIRED + "username": self.user, # REQUIRED + "password": "password"}, # REQUIRED **{"Content-Type": 'application/x-www-form-urlencoded', "Http_Authorization": f'Basic {credential}'} ) @@ -76,37 +233,34 @@ class OAuth2FlowTestCase(TestCase): self.assertEqual(resp.status_code, 400) resp = self.client.post('/o/token/', - data={"grant_type": "password", - "username": self.user, - "password": self.user_password}, + data={"grant_type": "password", # REQUIRED + "username": self.user, # REQUIRED + "password": self.user_password}, # REQUIRED **{"Content-Type": 'application/x-www-form-urlencoded', "HTTP_Authorization": f'Basic {credential}'} ) - + self.assertEqual(resp.status_code, 200) access_token = AccessToken.objects.get(token=resp.json()['access_token']) self.assertEqual('refresh_token' in resp.json(), True) - self.assertEqual(access_token.scope, '0_0') # token do nothing + self.assertEqual(access_token.scope, '0_0') # token do nothing # RFC6749 4.3.2 allows use of scope in ROPB token access request - + resp = self.client.post('/o/token/', - data={"grant_type": "password", - #"client_id": app.client_id, - "username": self.user, - "password": self.user_password, - "scope": self.base_scope}, + data={"grant_type": "password", # REQUIRED + "username": self.user, # REQUIRED + "password": self.user_password, # REQUIRED + "scope": self.base_scope}, # OPTIONAL **{"Content-Type": 'application/x-www-form-urlencoded', "HTTP_Authorization": f'Basic {credential}'} ) token = AccessToken.objects.get(token=resp.json()['access_token']) - self.assertEqual(token.scope, self.base_scope) # token do nothing more than base_scope - - + self.assertEqual(token.scope, self.base_scope) # token do nothing more than base_scope def test_oauth2_client_credentials(self): """ @@ -123,7 +277,7 @@ class OAuth2FlowTestCase(TestCase): # No token without credential resp = self.client.post('/o/token/', - data={"grant_type": "client_credentials"}, + data={"grant_type": "client_credentials"}, # REQUIRED **{"Content-Type": 'application/x-www-form-urlencoded'} ) @@ -133,7 +287,7 @@ class OAuth2FlowTestCase(TestCase): credential = base64.b64encode(f'{app.client_id}:{app.client_secret}'.encode('utf-8')).decode() resp = self.client.post('/o/token/', - data={"grant_type": "client_credentials"}, + data={"grant_type": "client_credentials"}, # REQUIRED **{'HTTP_Authorization': f'Basic {credential}', "Content-Type": 'application/x-www-form-urlencoded'} ) @@ -147,8 +301,8 @@ class OAuth2FlowTestCase(TestCase): # RFC6749 4.4.2 allows use of scope in client credential flow resp = self.client.post('/o/token/', - data={"grant_type": "client_credentials", - "scope": self.base_scope}, + data={"grant_type": "client_credentials", # REQUIRED + "scope": self.base_scope}, # OPTIONAL **{'http_Authorization': f'Basic {credential}', "Content-Type": 'application/x-www-form-urlencoded'} )