From 08593700fc7f31feba5df9a357009f3f02b7fe44 Mon Sep 17 00:00:00 2001 From: quark Date: Sun, 9 Nov 2025 11:18:11 +0100 Subject: [PATCH] implicit flow #137 --- apps/permission/scopes.py | 14 +++ apps/permission/tests/test_oauth2_flow.py | 131 +++++++++++++++++++++- 2 files changed, 144 insertions(+), 1 deletion(-) diff --git a/apps/permission/scopes.py b/apps/permission/scopes.py index cece9a21..cd88f18f 100644 --- a/apps/permission/scopes.py +++ b/apps/permission/scopes.py @@ -72,6 +72,11 @@ class PermissionOAuth2Validator(OAuth2Validator): "email": request.user.email, } + def get_userinfo_claims(self, request): + claims = super().get_userinfo_claims(request) + claims['is_active'] = request.user.is_active + return claims + def get_discovery_claims(self, request): claims = super().get_discovery_claims(self) return claims + ["name", "normalized_name", "email"] @@ -162,6 +167,12 @@ class PermissionOAuth2Validator(OAuth2Validator): request.scopes = valid_scopes return valid_scopes + def validate_implicit_scopes(self, client_id, scopes, client, request, *args, **kwargs): + """ + Implicit flow it's just degraded Authorization flow + """ + return self.validate_code_scopes(client_id, scopes, client, request, args, kwargs) + def validate_scopes(self, client_id, scopes, client, request, *args, **kwargs): """ User can request as many scope as he wants, including invalid scopes, @@ -179,6 +190,9 @@ class PermissionOAuth2Validator(OAuth2Validator): if hasattr(request, '_params') and request._params['response_type'] == 'code': return self.validate_code_scopes(client_id, scopes, client, request, args, kwargs) + if hasattr(request, '_params') and request._params['response_type'] == 'token': + return self.validate_implicit_scopes(client_id, scopes, client, request, args, kwargs) + for t in Permission.PERMISSION_TYPES: for p in PermissionBackend.get_raw_permissions(get_current_request(), t[0]): scope = f"{p.id}_{p.membership.club.id}" diff --git a/apps/permission/tests/test_oauth2_flow.py b/apps/permission/tests/test_oauth2_flow.py index 8fc6c71e..ffd022dd 100644 --- a/apps/permission/tests/test_oauth2_flow.py +++ b/apps/permission/tests/test_oauth2_flow.py @@ -204,7 +204,136 @@ class OAuth2FlowTestCase(TestCase): """ Ensure OAuth2 Implicit Flow work """ - pass + app = Application.objects.create( + name="Test Implicit Flow", + client_type=Application.CLIENT_CONFIDENTIAL, + authorization_grant_type=Application.GRANT_IMPLICIT, + user=self.user, + hash_client_secret=False, + algorithm=Application.NO_ALGORITHM, + redirect_uris='http://127.0.0.1:8000/noexist/callback/', + ) + + ############################ + # Minimal RFC6749 requests # + ############################ + + resp = self.client.get('/o/authorize/', + data={'response_type': 'token', # REQUIRED + 'client_id': app.client_id}, # REQUIRED + **{"Content-Type": 'application/x-www-form-urlencoded'} + ) + + # Get user authorization + ################################################################################## + url = resp.url + csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0] + + resp = self.client.post(url, + data={"username": self.user.username, + "password": self.user_password, + "permission_mask": 1, + "csrfmiddlewaretoken": csrf_token}) + + url = resp.url + resp = self.client.get(url) + + csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0] + + resp = self.client.post(url, + follow=True, + data={"allow": "Authorize", + "scope": '0_0', + "csrfmiddlewaretoken": csrf_token, + "response_type": "token", + "client_id": app.client_id, + "redirect_uri": app.redirect_uris}) + + url = resp.redirect_chain[0][0] + keys = url.split('#')[1] + refresh_token = '' + + for couple in keys.split('&'): + if couple.split('=')[0] == 'access_token': + token = couple.split('=')[1] + if couple.split('=')[0] == 'refresh_token': + refresh_token = couple.split('=')[1] + + ################################################################################## + + self.assertEqual(refresh_token, '') + + access_token = AccessToken.objects.get(token=token) + + # Token do nothing, it should be have the useless scope + self.assertEqual(access_token.scope, '0_0') + + # Logout user + self.client.logout() + + ############################ + # Maximal RFC6749 requests # + ############################ + + state = get_random_string(32) + + resp = self.client.get('/o/authorize/', + data={'response_type': 'token', # REQUIRED + 'client_id': app.client_id, # REQUIRED + 'redirect_uri': app.redirect_uris, # OPTIONAL + 'scope': self.base_scope, # OPTIONAL + 'state': state}, # RECOMMENDED + **{"Content-Type": 'application/x-www-form-urlencoded'} + ) + + # Get user authorization + ################################################################################## + url = resp.url + csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0] + + resp = self.client.post(url, + data={"username": self.user.username, + "password": self.user_password, + "permission_mask": 1, + "csrfmiddlewaretoken": csrf_token}) + + url = resp.url + resp = self.client.get(url) + + csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0] + + resp = self.client.post(url, + follow=True, + data={"allow": "Authorize", + "scope": self.base_scope, + "state": state, + "csrfmiddlewaretoken": csrf_token, + "response_type": "token", + "client_id": app.client_id, + "redirect_uri": app.redirect_uris}) + + url = resp.redirect_chain[0][0] + keys = url.split('#')[1] + refresh_token = '' + + for couple in keys.split('&'): + if couple.split('=')[0] == 'access_token': + token = couple.split('=')[1] + if couple.split('=')[0] == 'refresh_token': + refresh_token = couple.split('=')[1] + if couple.split('=')[0] == 'state': + resp_state = couple.split('=')[1] + + ################################################################################## + + self.assertEqual(refresh_token, '') + + access_token = AccessToken.objects.get(token=token) + + # Token can have access, it shouldn't have the useless scope + self.assertEqual(access_token.scope, self.base_scope) + + self.assertEqual(state, resp_state) def test_oauth2_resource_owner_password_credentials_flow(self): """