This commit is contained in:
Valentin Samir 2016-06-24 21:23:33 +02:00
parent 12201665de
commit 0776e371e8
3 changed files with 229 additions and 62 deletions

View File

@ -76,4 +76,7 @@ setting_default('CAS_SQL_PASSWORD_CHECK', 'crypt') # crypt or plain
setting_default('CAS_TEST_USER', 'test') setting_default('CAS_TEST_USER', 'test')
setting_default('CAS_TEST_PASSWORD', 'test') setting_default('CAS_TEST_PASSWORD', 'test')
setting_default('CAS_TEST_ATTRIBUTES', {'nom': 'Nymous', 'prenom': 'Ano', 'email': 'anonymous@example.net'}) setting_default(
'CAS_TEST_ATTRIBUTES',
{'nom': 'Nymous', 'prenom': 'Ano', 'email': 'anonymous@example.net'}
)

View File

@ -4,11 +4,11 @@ from django.test import TestCase
from django.test import Client from django.test import Client
from lxml import etree from lxml import etree
import BaseHTTPServer
import models import models
import utils import utils
def get_login_page_params(): def get_login_page_params():
client = Client() client = Client()
response = client.get('/login') response = client.get('/login')
@ -21,24 +21,28 @@ def get_login_page_params():
params[field.name] = "" params[field.name] = ""
return client, params return client, params
def get_auth_client(): def get_auth_client():
client, params = get_login_page_params() client, params = get_login_page_params()
params["username"] = settings.CAS_TEST_USER params["username"] = settings.CAS_TEST_USER
params["password"] = settings.CAS_TEST_PASSWORD params["password"] = settings.CAS_TEST_PASSWORD
response = client.post('/login', params) client.post('/login', params)
return client return client
def get_user_ticket_request(service): def get_user_ticket_request(service):
client = get_auth_client() client = get_auth_client()
response = client.get("/login", {"service": service}) response = client.get("/login", {"service": service})
ticket_value = response['Location'].split('ticket=')[-1] ticket_value = response['Location'].split('ticket=')[-1]
user = models.User.objects.get(username=settings.CAS_TEST_USER, session_key=client.session.session_key) user = models.User.objects.get(
username=settings.CAS_TEST_USER,
session_key=client.session.session_key
)
ticket = models.ServiceTicket.objects.get(value=ticket_value) ticket = models.ServiceTicket.objects.get(value=ticket_value)
return (user, ticket) return (user, ticket)
def get_pgt(): def get_pgt():
(httpd_thread, host, port) = utils.PGTUrlHandler.run() (httpd_thread, host, port) = utils.PGTUrlHandler.run()
service = "http://%s:%s" % (host, port) service = "http://%s:%s" % (host, port)
@ -46,7 +50,7 @@ def get_pgt():
(user, ticket) = get_user_ticket_request(service) (user, ticket) = get_user_ticket_request(service)
client = Client() client = Client()
response = client.get('/serviceValidate', {'ticket': ticket.value, 'service': service, 'pgtUrl': service}) client.get('/serviceValidate', {'ticket': ticket.value, 'service': service, 'pgtUrl': service})
params = utils.PGTUrlHandler.PARAMS.copy() params = utils.PGTUrlHandler.PARAMS.copy()
params["service"] = service params["service"] = service
@ -54,6 +58,7 @@ def get_pgt():
return params return params
class LoginTestCase(TestCase): class LoginTestCase(TestCase):
def setUp(self): def setUp(self):
@ -72,10 +77,19 @@ class LoginTestCase(TestCase):
response = client.post('/login', params) response = client.post('/login', params)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertTrue("You have successfully logged into the Central Authentication Service" in response.content) self.assertTrue(
(
self.assertTrue(models.User.objects.get(username=settings.CAS_TEST_USER, session_key=client.session.session_key)) "You have successfully logged into "
"the Central Authentication Service"
) in response.content
)
self.assertTrue(
models.User.objects.get(
username=settings.CAS_TEST_USER,
session_key=client.session.session_key
)
)
def test_login_view_post_badlt(self): def test_login_view_post_badlt(self):
client, params = get_login_page_params() client, params = get_login_page_params()
@ -87,8 +101,12 @@ class LoginTestCase(TestCase):
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertTrue("Invalid login ticket" in response.content) self.assertTrue("Invalid login ticket" in response.content)
self.assertFalse("You have successfully logged into the Central Authentication Service" in response.content) self.assertFalse(
(
"You have successfully logged into "
"the Central Authentication Service"
) in response.content
)
def test_login_view_post_badpass_good_lt(self): def test_login_view_post_badpass_good_lt(self):
client, params = get_login_page_params() client, params = get_login_page_params()
@ -97,19 +115,35 @@ class LoginTestCase(TestCase):
response = client.post('/login', params) response = client.post('/login', params)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertTrue(" The credentials you provided cannot be determined to be authentic" in response.content) self.assertTrue(
self.assertFalse("You have successfully logged into the Central Authentication Service" in response.content) (
"The credentials you provided cannot be "
"determined to be authentic"
) in response.content
)
self.assertFalse(
(
"You have successfully logged into "
"the Central Authentication Service"
) in response.content
)
def test_view_login_get_auth_allowed_service(self): def test_view_login_get_auth_allowed_service(self):
client = get_auth_client() client = get_auth_client()
response = client.get("/login?service=https://www.example.com") response = client.get("/login?service=https://www.example.com")
self.assertEqual(response.status_code, 302) self.assertEqual(response.status_code, 302)
self.assertTrue(response.has_header('Location')) self.assertTrue(response.has_header('Location'))
self.assertTrue(response['Location'].startswith("https://www.example.com?ticket=%s-" % settings.CAS_SERVICE_TICKET_PREFIX)) self.assertTrue(
response['Location'].startswith(
"https://www.example.com?ticket=%s-" % settings.CAS_SERVICE_TICKET_PREFIX
)
)
ticket_value = response['Location'].split('ticket=')[-1] ticket_value = response['Location'].split('ticket=')[-1]
user = models.User.objects.get(username=settings.CAS_TEST_USER, session_key=client.session.session_key) user = models.User.objects.get(
username=settings.CAS_TEST_USER,
session_key=client.session.session_key
)
self.assertTrue(user) self.assertTrue(user)
ticket = models.ServiceTicket.objects.get(value=ticket_value) ticket = models.ServiceTicket.objects.get(value=ticket_value)
self.assertEqual(ticket.user, user) self.assertEqual(ticket.user, user)
@ -134,15 +168,30 @@ class LogoutTestCase(TestCase):
response = client.get("/login") response = client.get("/login")
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertTrue("You have successfully logged into the Central Authentication Service" in response.content) self.assertTrue(
(
"You have successfully logged into "
"the Central Authentication Service"
) in response.content
)
response = client.get("/logout") response = client.get("/logout")
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertTrue("You have successfully logged out from the Central Authentication Service" in response.content) self.assertTrue(
(
"You have successfully logged out from "
"the Central Authentication Service"
) in response.content
)
response = client.get("/login") response = client.get("/login")
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertFalse("You have successfully logged into the Central Authentication Service" in response.content) self.assertFalse(
(
"You have successfully logged into "
"the Central Authentication Service"
) in response.content
)
def test_logout_view_url(self): def test_logout_view_url(self):
client = get_auth_client() client = get_auth_client()
@ -154,7 +203,12 @@ class LogoutTestCase(TestCase):
response = client.get("/login") response = client.get("/login")
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertFalse("You have successfully logged into the Central Authentication Service" in response.content) self.assertFalse(
(
"You have successfully logged into "
"the Central Authentication Service"
) in response.content
)
def test_logout_view_service(self): def test_logout_view_service(self):
client = get_auth_client() client = get_auth_client()
@ -166,11 +220,12 @@ class LogoutTestCase(TestCase):
response = client.get("/login") response = client.get("/login")
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertFalse("You have successfully logged into the Central Authentication Service" in response.content) self.assertFalse(
(
"You have successfully logged into "
open("/tmp/test.html", "w").write(response.content) "the Central Authentication Service"
) in response.content
)
class AuthTestCase(TestCase): class AuthTestCase(TestCase):
@ -186,35 +241,75 @@ class AuthTestCase(TestCase):
def test_auth_view_goodpass(self): def test_auth_view_goodpass(self):
settings.CAS_AUTH_SHARED_SECRET = 'test' settings.CAS_AUTH_SHARED_SECRET = 'test'
client = Client() client = Client()
response = client.post('/auth', {'username':settings.CAS_TEST_USER, 'password':settings.CAS_TEST_PASSWORD, 'service':self.service, 'secret':'test'}) response = client.post(
'/auth',
{
'username': settings.CAS_TEST_USER,
'password': settings.CAS_TEST_PASSWORD,
'service': self.service,
'secret': 'test'
}
)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, 'yes\n') self.assertEqual(response.content, 'yes\n')
def test_auth_view_badpass(self): def test_auth_view_badpass(self):
settings.CAS_AUTH_SHARED_SECRET = 'test' settings.CAS_AUTH_SHARED_SECRET = 'test'
client = Client() client = Client()
response = client.post('/auth', {'username':settings.CAS_TEST_USER, 'password':'badpass', 'service':self.service, 'secret':'test'}) response = client.post(
'/auth',
{
'username': settings.CAS_TEST_USER,
'password': 'badpass',
'service': self.service,
'secret': 'test'
}
)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, 'no\n') self.assertEqual(response.content, 'no\n')
def test_auth_view_badservice(self): def test_auth_view_badservice(self):
settings.CAS_AUTH_SHARED_SECRET = 'test' settings.CAS_AUTH_SHARED_SECRET = 'test'
client = Client() client = Client()
response = client.post('/auth', {'username':settings.CAS_TEST_USER, 'password':settings.CAS_TEST_PASSWORD, 'service':'https://www.example.org', 'secret':'test'}) response = client.post(
'/auth',
{
'username': settings.CAS_TEST_USER,
'password': settings.CAS_TEST_PASSWORD,
'service': 'https://www.example.org',
'secret': 'test'
}
)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, 'no\n') self.assertEqual(response.content, 'no\n')
def test_auth_view_badsecret(self): def test_auth_view_badsecret(self):
settings.CAS_AUTH_SHARED_SECRET = 'test' settings.CAS_AUTH_SHARED_SECRET = 'test'
client = Client() client = Client()
response = client.post('/auth', {'username':settings.CAS_TEST_USER, 'password':settings.CAS_TEST_PASSWORD, 'service':self.service, 'secret':'badsecret'}) response = client.post(
'/auth',
{
'username': settings.CAS_TEST_USER,
'password': settings.CAS_TEST_PASSWORD,
'service': self.service,
'secret': 'badsecret'
}
)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, 'no\n') self.assertEqual(response.content, 'no\n')
def test_auth_view_badsettings(self): def test_auth_view_badsettings(self):
settings.CAS_AUTH_SHARED_SECRET = None settings.CAS_AUTH_SHARED_SECRET = None
client = Client() client = Client()
response = client.post('/auth', {'username':settings.CAS_TEST_USER, 'password':settings.CAS_TEST_PASSWORD, 'service':self.service, 'secret':'test'}) response = client.post(
'/auth',
{
'username': settings.CAS_TEST_USER,
'password': settings.CAS_TEST_PASSWORD,
'service': self.service,
'secret': 'test'
}
)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, "no\nplease set CAS_AUTH_SHARED_SECRET") self.assertEqual(response.content, "no\nplease set CAS_AUTH_SHARED_SECRET")
@ -242,7 +337,10 @@ class ValidateTestCase(TestCase):
(user, ticket) = get_user_ticket_request(self.service) (user, ticket) = get_user_ticket_request(self.service)
client = Client() client = Client()
response = client.get('/validate', {'ticket': ticket.value, 'service': "https://www.example.org"}) response = client.get(
'/validate',
{'ticket': ticket.value, 'service': "https://www.example.org"}
)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, 'no\n') self.assertEqual(response.content, 'no\n')
@ -250,10 +348,14 @@ class ValidateTestCase(TestCase):
(user, ticket) = get_user_ticket_request(self.service) (user, ticket) = get_user_ticket_request(self.service)
client = Client() client = Client()
response = client.get('/validate', {'ticket': "%s-RANDOM" % settings.CAS_SERVICE_TICKET_PREFIX, 'service': self.service}) response = client.get(
'/validate',
{'ticket': "%s-RANDOM" % settings.CAS_SERVICE_TICKET_PREFIX, 'service': self.service}
)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, 'no\n') self.assertEqual(response.content, 'no\n')
class ValidateServiceTestCase(TestCase): class ValidateServiceTestCase(TestCase):
def setUp(self): def setUp(self):
@ -274,14 +376,20 @@ class ValidateServiceTestCase(TestCase):
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
root = etree.fromstring(response.content) root = etree.fromstring(response.content)
sucess = root.xpath("//cas:authenticationSuccess", namespaces={'cas': "http://www.yale.edu/tp/cas"}) sucess = root.xpath(
"//cas:authenticationSuccess",
namespaces={'cas': "http://www.yale.edu/tp/cas"}
)
self.assertTrue(sucess) self.assertTrue(sucess)
users = root.xpath("//cas:user", namespaces={'cas': "http://www.yale.edu/tp/cas"}) users = root.xpath("//cas:user", namespaces={'cas': "http://www.yale.edu/tp/cas"})
self.assertEqual(len(users), 1) self.assertEqual(len(users), 1)
self.assertEqual(users[0].text, settings.CAS_TEST_USER) self.assertEqual(users[0].text, settings.CAS_TEST_USER)
attributes = root.xpath("//cas:attributes", namespaces={'cas': "http://www.yale.edu/tp/cas"}) attributes = root.xpath(
"//cas:attributes",
namespaces={'cas': "http://www.yale.edu/tp/cas"}
)
self.assertEqual(len(attributes), 1) self.assertEqual(len(attributes), 1)
attrs1 = {} attrs1 = {}
for attr in attributes[0]: for attr in attributes[0]:
@ -304,7 +412,10 @@ class ValidateServiceTestCase(TestCase):
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
root = etree.fromstring(response.content) root = etree.fromstring(response.content)
error = root.xpath("//cas:authenticationFailure", namespaces={'cas': "http://www.yale.edu/tp/cas"}) error = root.xpath(
"//cas:authenticationFailure",
namespaces={'cas': "http://www.yale.edu/tp/cas"}
)
self.assertEqual(len(error), 1) self.assertEqual(len(error), 1)
self.assertEqual(error[0].attrib['code'], "INVALID_SERVICE") self.assertEqual(error[0].attrib['code'], "INVALID_SERVICE")
self.assertEqual(error[0].text, bad_service) self.assertEqual(error[0].text, bad_service)
@ -318,7 +429,10 @@ class ValidateServiceTestCase(TestCase):
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
root = etree.fromstring(response.content) root = etree.fromstring(response.content)
error = root.xpath("//cas:authenticationFailure", namespaces={'cas': "http://www.yale.edu/tp/cas"}) error = root.xpath(
"//cas:authenticationFailure",
namespaces={'cas': "http://www.yale.edu/tp/cas"}
)
self.assertEqual(len(error), 1) self.assertEqual(len(error), 1)
self.assertEqual(error[0].attrib['code'], "INVALID_TICKET") self.assertEqual(error[0].attrib['code'], "INVALID_TICKET")
self.assertEqual(error[0].text, 'ticket not found') self.assertEqual(error[0].text, 'ticket not found')
@ -332,7 +446,10 @@ class ValidateServiceTestCase(TestCase):
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
root = etree.fromstring(response.content) root = etree.fromstring(response.content)
error = root.xpath("//cas:authenticationFailure", namespaces={'cas': "http://www.yale.edu/tp/cas"}) error = root.xpath(
"//cas:authenticationFailure",
namespaces={'cas': "http://www.yale.edu/tp/cas"}
)
self.assertEqual(len(error), 1) self.assertEqual(len(error), 1)
self.assertEqual(error[0].attrib['code'], "INVALID_TICKET") self.assertEqual(error[0].attrib['code'], "INVALID_TICKET")
self.assertEqual(error[0].text, bad_ticket) self.assertEqual(error[0].text, bad_ticket)
@ -344,13 +461,18 @@ class ValidateServiceTestCase(TestCase):
(user, ticket) = get_user_ticket_request(service) (user, ticket) = get_user_ticket_request(service)
client = Client() client = Client()
response = client.get('/serviceValidate', {'ticket': ticket.value, 'service': service, 'pgtUrl': service}) response = client.get(
'/serviceValidate',
{'ticket': ticket.value, 'service': service, 'pgtUrl': service}
)
pgt_params = utils.PGTUrlHandler.PARAMS.copy() pgt_params = utils.PGTUrlHandler.PARAMS.copy()
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
root = etree.fromstring(response.content) root = etree.fromstring(response.content)
pgtiou = root.xpath("//cas:proxyGrantingTicket", namespaces={'cas': "http://www.yale.edu/tp/cas"}) pgtiou = root.xpath(
"//cas:proxyGrantingTicket",
namespaces={'cas': "http://www.yale.edu/tp/cas"}
)
self.assertEqual(len(pgtiou), 1) self.assertEqual(len(pgtiou), 1)
self.assertEqual(pgt_params["pgtIou"], pgtiou[0].text) self.assertEqual(pgt_params["pgtIou"], pgtiou[0].text)
self.assertTrue("pgtId" in pgt_params) self.assertTrue("pgtId" in pgt_params)
@ -361,15 +483,22 @@ class ValidateServiceTestCase(TestCase):
(user, ticket) = get_user_ticket_request(self.service) (user, ticket) = get_user_ticket_request(self.service)
client = Client() client = Client()
response = client.get('/serviceValidate', {'ticket': ticket.value, 'service': self.service, 'pgtUrl': self.service}) response = client.get(
'/serviceValidate',
{'ticket': ticket.value, 'service': self.service, 'pgtUrl': self.service}
)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
root = etree.fromstring(response.content) root = etree.fromstring(response.content)
error = root.xpath("//cas:authenticationFailure", namespaces={'cas': "http://www.yale.edu/tp/cas"}) error = root.xpath(
"//cas:authenticationFailure",
namespaces={'cas': "http://www.yale.edu/tp/cas"}
)
self.assertEqual(len(error), 1) self.assertEqual(len(error), 1)
self.assertEqual(error[0].attrib['code'], "INVALID_PROXY_CALLBACK") self.assertEqual(error[0].attrib['code'], "INVALID_PROXY_CALLBACK")
self.assertEqual(error[0].text, "callback url not allowed by configuration") self.assertEqual(error[0].text, "callback url not allowed by configuration")
class ProxyTestCase(TestCase): class ProxyTestCase(TestCase):
def setUp(self): def setUp(self):
@ -383,7 +512,6 @@ class ProxyTestCase(TestCase):
) )
models.ReplaceAttributName.objects.create(name="*", service_pattern=self.service_pattern) models.ReplaceAttributName.objects.create(name="*", service_pattern=self.service_pattern)
def test_validate_proxy_ok(self): def test_validate_proxy_ok(self):
params = get_pgt() params = get_pgt()
@ -396,18 +524,23 @@ class ProxyTestCase(TestCase):
sucess = root.xpath("//cas:proxySuccess", namespaces={'cas': "http://www.yale.edu/tp/cas"}) sucess = root.xpath("//cas:proxySuccess", namespaces={'cas': "http://www.yale.edu/tp/cas"})
self.assertTrue(sucess) self.assertTrue(sucess)
proxy_ticket = root.xpath("//cas:proxyTicket", namespaces={'cas': "http://www.yale.edu/tp/cas"}) proxy_ticket = root.xpath(
"//cas:proxyTicket",
namespaces={'cas': "http://www.yale.edu/tp/cas"}
)
self.assertEqual(len(proxy_ticket), 1) self.assertEqual(len(proxy_ticket), 1)
proxy_ticket = proxy_ticket[0].text proxy_ticket = proxy_ticket[0].text
# validate the proxy ticket # validate the proxy ticket
client2 = Client() client2 = Client()
response = client2.get('/proxyValidate', {'ticket': proxy_ticket, 'service': self.service}) response = client2.get('/proxyValidate', {'ticket': proxy_ticket, 'service': self.service})
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
root = etree.fromstring(response.content) root = etree.fromstring(response.content)
sucess = root.xpath("//cas:authenticationSuccess", namespaces={'cas': "http://www.yale.edu/tp/cas"}) sucess = root.xpath(
"//cas:authenticationSuccess",
namespaces={'cas': "http://www.yale.edu/tp/cas"}
)
self.assertTrue(sucess) self.assertTrue(sucess)
# check that the proxy is send to the end service # check that the proxy is send to the end service
@ -422,7 +555,10 @@ class ProxyTestCase(TestCase):
self.assertEqual(len(users), 1) self.assertEqual(len(users), 1)
self.assertEqual(users[0].text, settings.CAS_TEST_USER) self.assertEqual(users[0].text, settings.CAS_TEST_USER)
attributes = root.xpath("//cas:attributes", namespaces={'cas': "http://www.yale.edu/tp/cas"}) attributes = root.xpath(
"//cas:attributes",
namespaces={'cas': "http://www.yale.edu/tp/cas"}
)
self.assertEqual(len(attributes), 1) self.assertEqual(len(attributes), 1)
attrs1 = {} attrs1 = {}
for attr in attributes[0]: for attr in attributes[0]:
@ -436,43 +572,68 @@ class ProxyTestCase(TestCase):
self.assertEqual(attrs1, attrs2) self.assertEqual(attrs1, attrs2)
self.assertEqual(attrs1, settings.CAS_TEST_ATTRIBUTES) self.assertEqual(attrs1, settings.CAS_TEST_ATTRIBUTES)
def test_validate_proxy_bad(self): def test_validate_proxy_bad(self):
params = get_pgt() params = get_pgt()
# bad PGT # bad PGT
client1 = Client() client1 = Client()
response = client1.get('/proxy', {'pgt': "%s-RANDOM" % settings.CAS_PROXY_GRANTING_TICKET_PREFIX, 'targetService': params['service']}) response = client1.get(
'/proxy',
{
'pgt': "%s-RANDOM" % settings.CAS_PROXY_GRANTING_TICKET_PREFIX,
'targetService': params['service']
}
)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
root = etree.fromstring(response.content) root = etree.fromstring(response.content)
error = root.xpath("//cas:authenticationFailure", namespaces={'cas': "http://www.yale.edu/tp/cas"}) error = root.xpath(
"//cas:authenticationFailure",
namespaces={'cas': "http://www.yale.edu/tp/cas"}
)
self.assertEqual(len(error), 1) self.assertEqual(len(error), 1)
self.assertEqual(error[0].attrib['code'], "INVALID_TICKET") self.assertEqual(error[0].attrib['code'], "INVALID_TICKET")
self.assertEqual(error[0].text, "PGT %s-RANDOM not found" % settings.CAS_PROXY_GRANTING_TICKET_PREFIX) self.assertEqual(
error[0].text,
"PGT %s-RANDOM not found" % settings.CAS_PROXY_GRANTING_TICKET_PREFIX
)
# bad targetService # bad targetService
client2 = Client() client2 = Client()
response = client2.get('/proxy', {'pgt': params['pgtId'], 'targetService': "https://www.example.org"}) response = client2.get(
'/proxy',
{'pgt': params['pgtId'], 'targetService': "https://www.example.org"}
)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
root = etree.fromstring(response.content) root = etree.fromstring(response.content)
error = root.xpath("//cas:authenticationFailure", namespaces={'cas': "http://www.yale.edu/tp/cas"}) error = root.xpath(
"//cas:authenticationFailure",
namespaces={'cas': "http://www.yale.edu/tp/cas"}
)
self.assertEqual(len(error), 1) self.assertEqual(len(error), 1)
self.assertEqual(error[0].attrib['code'], "UNAUTHORIZED_SERVICE") self.assertEqual(error[0].attrib['code'], "UNAUTHORIZED_SERVICE")
self.assertEqual(error[0].text, "https://www.example.org") self.assertEqual(error[0].text, "https://www.example.org")
# service do not allow proxy ticket # service do not allow proxy ticket
self.service_pattern.proxy = False self.service_pattern.proxy = False
self.service_pattern.save() self.service_pattern.save()
client3 = Client() client3 = Client()
response = client3.get('/proxy', {'pgt': params['pgtId'], 'targetService': params['service']}) response = client3.get(
'/proxy',
{'pgt': params['pgtId'], 'targetService': params['service']}
)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
root = etree.fromstring(response.content) root = etree.fromstring(response.content)
error = root.xpath("//cas:authenticationFailure", namespaces={'cas': "http://www.yale.edu/tp/cas"}) error = root.xpath(
"//cas:authenticationFailure",
namespaces={'cas': "http://www.yale.edu/tp/cas"}
)
self.assertEqual(len(error), 1) self.assertEqual(len(error), 1)
self.assertEqual(error[0].attrib['code'], "UNAUTHORIZED_SERVICE") self.assertEqual(error[0].attrib['code'], "UNAUTHORIZED_SERVICE")
self.assertEqual(error[0].text, 'the service %s do not allow proxy ticket' % params['service']) self.assertEqual(
error[0].text,
'the service %s do not allow proxy ticket' % params['service']
)

View File

@ -150,6 +150,7 @@ def gen_saml_id():
class PGTUrlHandler(BaseHTTPServer.BaseHTTPRequestHandler): class PGTUrlHandler(BaseHTTPServer.BaseHTTPRequestHandler):
PARAMS = {} PARAMS = {}
def do_GET(s): def do_GET(s):
s.send_response(200) s.send_response(200)
s.send_header("Content-type", "text/plain") s.send_header("Content-type", "text/plain")
@ -159,6 +160,7 @@ class PGTUrlHandler(BaseHTTPServer.BaseHTTPRequestHandler):
params = dict(parse_qsl(url.query)) params = dict(parse_qsl(url.query))
PGTUrlHandler.PARAMS.update(params) PGTUrlHandler.PARAMS.update(params)
s.wfile.write("%s" % params) s.wfile.write("%s" % params)
def log_message(self, format, *args): def log_message(self, format, *args):
return return
@ -167,10 +169,11 @@ class PGTUrlHandler(BaseHTTPServer.BaseHTTPRequestHandler):
server_class = BaseHTTPServer.HTTPServer server_class = BaseHTTPServer.HTTPServer
httpd = server_class(("127.0.0.1", 0), PGTUrlHandler) httpd = server_class(("127.0.0.1", 0), PGTUrlHandler)
(host, port) = httpd.socket.getsockname() (host, port) = httpd.socket.getsockname()
def lauch(): def lauch():
httpd.handle_request() httpd.handle_request()
#httpd.serve_forever()
httpd.server_close() httpd.server_close()
httpd_thread = Thread(target=lauch) httpd_thread = Thread(target=lauch)
httpd_thread.daemon = True httpd_thread.daemon = True
httpd_thread.start() httpd_thread.start()