Merge pull request #6 from nitmir/federate

Add a federated mode.

When the settings CAS_FEDERATE is True, django-cas-server will offer to the user to choose its CAS backend to authenticate. Hence the login page do not display anymore a username/password form but a select form with configured CASs backend.

This allow to give access to CAS supported applications to users from multiple organization seamlessly.

It was originally developped to mach the need of https://myares.fr (Federated CAS at https://cas.myares.fr, example of an application using it as https://chat.myares.fr)
This commit is contained in:
Valentin Samir 2016-07-05 12:54:44 +02:00 committed by GitHub
commit 37bb766245
32 changed files with 2325 additions and 319 deletions

View File

@ -5,12 +5,14 @@ omit =
cas_server/migrations*
cas_server/management/*
cas_server/tests/*
cas_server/cas.py
[report]
exclude_lines =
pragma: no cover
def __repr__
def __unicode__
def __str__
raise AssertionError
raise NotImplementedError
if six.PY3:

2
.gitignore vendored
View File

@ -15,3 +15,5 @@ coverage.xml
test_venv
.coverage
htmlcov/
tox_logs/
.cache/

View File

@ -40,6 +40,7 @@ Features
* Fine control on which user's attributes are passed to which service
* Possibility to rename/rewrite attributes per service
* Possibility to require some attribute values per service
* Federated mode between multiple CAS
* Supports Django 1.7, 1.8 and 1.9
* Supports Python 2.7, 3.x
@ -158,6 +159,17 @@ Authentication settings
If more requests need to be send, there are queued. The default is ``10``.
* ``CAS_SLO_TIMEOUT``: Timeout for a single SLO request in seconds. The default is ``5``.
Federation settings
-------------------
* ``CAS_FEDERATE``: A boolean for activating the federated mode (see the federate section below).
The default is ``False``.
* ``CAS_FEDERATE_REMEMBER_TIMEOUT``: Time after witch the cookie use for "remember my identity
provider" expire. The default is ``604800``, one week. The cookie is called
``_remember_provider``.
Tickets validity settings
-------------------------
@ -245,6 +257,8 @@ Authentication backend
This is the default backend. The returned attributes are the fields available on the user model.
* mysql backend ``cas_server.auth.MysqlAuthUser``: see the 'Mysql backend settings' section.
The returned attributes are those return by sql query ``CAS_SQL_USER_QUERY``.
* federated backend ``cas_server.auth.CASFederateAuth``: It is automatically used then ``CAS_FEDERATE`` is ``True``.
You should not set it manually without setting ``CAS_FEDERATE`` to ``True``.
Logs
====
@ -313,3 +327,51 @@ Or to log to a file:
},
},
}
Federation mode
===============
``django-cas-server`` comes with a federation mode. Then ``CAS_FEDERATE`` is ``True``,
user are invited to choose an identity provider on the login page, then, they are redirected
to the provider CAS to authenticate. This provider transmit to ``django-cas-server`` the user
username and attributes. The user is now logged in on ``django-cas-server`` and can use
services using ``django-cas-server`` as CAS.
The list of allowed identity providers is defined using the django admin application.
With the development server started, visit http://127.0.0.1:8000/admin/ to add identity providers.
An identity provider comes with 5 fields:
* `Position`: an integer used to tweak the order in which identity providers are displayed on
the login page. Identity providers are sorted using position first, then, on equal position,
using `verbose name` and then, on equal `verbose name`, using `suffix`.
* `Suffix`: the suffix that will be append to the username returned by the identity provider.
It must be unique.
* `Server url`: the url to the identity provider CAS. For instance, if you are using
`https://cas.example.org/login` to authenticate on the CAS, the `server url` is
`https://cas.example.org`
* `CAS protocol version`: the version of the CAS protocol to use to contact the identity provider.
The default is version 3.
* `Verbose name`: the name used on the login page to display the identity provider.
* `Display`: a boolean controlling the display of the identity provider on the login page.
Beware that this do not disable the identity provider, it just hide it on the login page.
User will always be able to log in using this provider by fetching `/federate/provider_suffix`.
In federation mode, ``django-cas-server`` build user's username as follow:
``provider_returned_username@provider_suffix``.
Choose the provider returned username for ``django-cas-server`` and the provider suffix
in order to make sense, as this built username is likely to be displayed to end users in
applications.
Then using federate mode, you should add one command to a daily crontab: ``cas_clean_federate``.
This command clean the local cache of federated user from old unused users.
You could for example do as bellow :
.. code-block::
10 0 * * * cas-user /path/to/project/manage.py cas_clean_federate

View File

@ -12,6 +12,7 @@
from django.contrib import admin
from .models import ServiceTicket, ProxyTicket, ProxyGrantingTicket, User, ServicePattern
from .models import Username, ReplaceAttributName, ReplaceAttributValue, FilterAttributValue
from .models import FederatedIendityProvider
from .forms import TicketForm
TICKETS_READONLY_FIELDS = ('validate', 'service', 'service_pattern',
@ -91,5 +92,12 @@ class ServicePatternAdmin(admin.ModelAdmin):
'single_log_out', 'proxy_callback', 'restrict_users')
class FederatedIendityProviderAdmin(admin.ModelAdmin):
"""`FederatedIendityProvider` in admin interface"""
fields = ('pos', 'suffix', 'server_url', 'cas_protocol_version', 'verbose_name', 'display')
list_display = ('verbose_name', 'suffix', 'display')
admin.site.register(User, UserAdmin)
admin.site.register(ServicePattern, ServicePatternAdmin)
admin.site.register(FederatedIendityProvider, FederatedIendityProviderAdmin)

View File

@ -1,4 +1,4 @@
# *- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
@ -12,6 +12,9 @@
"""Some authentication classes for the CAS"""
from django.conf import settings
from django.contrib.auth import get_user_model
from django.utils import timezone
from datetime import timedelta
try: # pragma: no cover
import MySQLdb
import MySQLdb.cursors
@ -19,6 +22,8 @@ try: # pragma: no cover
except ImportError:
MySQLdb = None
from .models import FederatedUser
class AuthUser(object):
"""Authentication base class"""
@ -136,3 +141,35 @@ class DjangoAuthUser(AuthUser): # pragma: no cover
return attr
else:
return {}
class CASFederateAuth(AuthUser):
"""Authentication class used then CAS_FEDERATE is True"""
user = None
def __init__(self, username):
try:
self.user = FederatedUser.get_from_federated_username(username)
super(CASFederateAuth, self).__init__(
self.user.federated_username
)
except FederatedUser.DoesNotExist:
super(CASFederateAuth, self).__init__(username)
def test_password(self, ticket):
"""test `password` agains the user"""
if not self.user or not self.user.ticket:
return False
else:
return (
ticket == self.user.ticket and
self.user.last_update >
(timezone.now() - timedelta(seconds=settings.CAS_TICKET_VALIDITY))
)
def attributs(self):
"""return a dict of user attributes"""
if not self.user: # pragma: no cover (should not happen)
return {}
else:
return self.user.attributs

394
cas_server/cas.py Normal file
View File

@ -0,0 +1,394 @@
# Copyright (C) 2014, Ming Chen
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is furnished
# to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
# This file is originated from https://github.com/python-cas/python-cas
# at commit ec1f2d4779625229398547b9234d0e9e874a2c9a
# some modifications have been made to be unicode coherent between python2 and python2
import six
from six.moves.urllib import parse as urllib_parse
from six.moves.urllib import request as urllib_request
from six.moves.urllib.request import Request
from uuid import uuid4
import datetime
class CASError(ValueError):
pass
class ReturnUnicode(object):
@staticmethod
def unicode(string, charset):
if not isinstance(string, six.text_type):
return string.decode(charset)
else:
return string
class SingleLogoutMixin(object):
@classmethod
def get_saml_slos(cls, logout_request):
"""returns saml logout ticket info"""
from lxml import etree
try:
root = etree.fromstring(logout_request)
return root.xpath(
"//samlp:SessionIndex",
namespaces={'samlp': "urn:oasis:names:tc:SAML:2.0:protocol"})
except etree.XMLSyntaxError:
pass
class CASClient(object):
def __new__(self, *args, **kwargs):
version = kwargs.pop('version')
if version in (1, '1'):
return CASClientV1(*args, **kwargs)
elif version in (2, '2'):
return CASClientV2(*args, **kwargs)
elif version in (3, '3'):
return CASClientV3(*args, **kwargs)
elif version == 'CAS_2_SAML_1_0':
return CASClientWithSAMLV1(*args, **kwargs)
raise ValueError('Unsupported CAS_VERSION %r' % version)
class CASClientBase(object):
logout_redirect_param_name = 'service'
def __init__(self, service_url=None, server_url=None,
extra_login_params=None, renew=False,
username_attribute=None):
self.service_url = service_url
self.server_url = server_url
self.extra_login_params = extra_login_params or {}
self.renew = renew
self.username_attribute = username_attribute
pass
def verify_ticket(self, ticket):
"""must return a triple"""
raise NotImplementedError()
def get_login_url(self):
"""Generates CAS login URL"""
params = {'service': self.service_url}
if self.renew:
params.update({'renew': 'true'})
params.update(self.extra_login_params)
url = urllib_parse.urljoin(self.server_url, 'login')
query = urllib_parse.urlencode(params)
return url + '?' + query
def get_logout_url(self, redirect_url=None):
"""Generates CAS logout URL"""
url = urllib_parse.urljoin(self.server_url, 'logout')
if redirect_url:
params = {self.logout_redirect_param_name: redirect_url}
url += '?' + urllib_parse.urlencode(params)
return url
def get_proxy_url(self, pgt):
"""Returns proxy url, given the proxy granting ticket"""
params = urllib_parse.urlencode({'pgt': pgt, 'targetService': self.service_url})
return "%s/proxy?%s" % (self.server_url, params)
def get_proxy_ticket(self, pgt):
"""Returns proxy ticket given the proxy granting ticket"""
response = urllib_request.urlopen(self.get_proxy_url(pgt))
if response.code == 200:
from lxml import etree
root = etree.fromstring(response.read())
tickets = root.xpath(
"//cas:proxyTicket",
namespaces={"cas": "http://www.yale.edu/tp/cas"}
)
if len(tickets) == 1:
return tickets[0].text
errors = root.xpath(
"//cas:authenticationFailure",
namespaces={"cas": "http://www.yale.edu/tp/cas"}
)
if len(errors) == 1:
raise CASError(errors[0].attrib['code'], errors[0].text)
raise CASError("Bad http code %s" % response.code)
class CASClientV1(CASClientBase, ReturnUnicode):
"""CAS Client Version 1"""
logout_redirect_param_name = 'url'
def verify_ticket(self, ticket):
"""Verifies CAS 1.0 authentication ticket.
Returns username on success and None on failure.
"""
params = [('ticket', ticket), ('service', self.service_url)]
url = (urllib_parse.urljoin(self.server_url, 'validate') + '?' +
urllib_parse.urlencode(params))
page = urllib_request.urlopen(url)
try:
verified = page.readline().strip()
if verified == b'yes':
content_type = page.info().get('Content-type')
if "charset=" in content_type:
charset = content_type.split("charset=")[-1]
else:
charset = "ascii"
user = self.unicode(page.readline().strip(), charset)
return user, None, None
else:
return None, None, None
finally:
page.close()
class CASClientV2(CASClientBase, ReturnUnicode):
"""CAS Client Version 2"""
url_suffix = 'serviceValidate'
logout_redirect_param_name = 'url'
def __init__(self, proxy_callback=None, *args, **kwargs):
"""proxy_callback is for V2 and V3 so V3 is subclass of V2"""
self.proxy_callback = proxy_callback
super(CASClientV2, self).__init__(*args, **kwargs)
def verify_ticket(self, ticket):
"""Verifies CAS 2.0+/3.0+ XML-based authentication ticket and returns extended attributes"""
(response, charset) = self.get_verification_response(ticket)
return self.verify_response(response, charset)
def get_verification_response(self, ticket):
params = [('ticket', ticket), ('service', self.service_url)]
if self.proxy_callback:
params.append(('pgtUrl', self.proxy_callback))
base_url = urllib_parse.urljoin(self.server_url, self.url_suffix)
url = base_url + '?' + urllib_parse.urlencode(params)
page = urllib_request.urlopen(url)
try:
content_type = page.info().get('Content-type')
if "charset=" in content_type:
charset = content_type.split("charset=")[-1]
else:
charset = "ascii"
return (page.read(), charset)
finally:
page.close()
@classmethod
def parse_attributes_xml_element(cls, element, charset):
attributes = dict()
for attribute in element:
tag = cls.self.unicode(attribute.tag, charset).split(u"}").pop()
if tag in attributes:
if isinstance(attributes[tag], list):
attributes[tag].append(cls.unicode(attribute.text, charset))
else:
attributes[tag] = [attributes[tag]]
attributes[tag].append(cls.unicode(attribute.text, charset))
else:
if tag == u'attraStyle':
pass
else:
attributes[tag] = cls.unicode(attribute.text, charset)
return attributes
@classmethod
def verify_response(cls, response, charset):
user, attributes, pgtiou = cls.parse_response_xml(response, charset)
if len(attributes) == 0:
attributes = None
return user, attributes, pgtiou
@classmethod
def parse_response_xml(cls, response, charset):
try:
from xml.etree import ElementTree
except ImportError:
from elementtree import ElementTree
user = None
attributes = {}
pgtiou = None
tree = ElementTree.fromstring(response)
if tree[0].tag.endswith('authenticationSuccess'):
for element in tree[0]:
if element.tag.endswith('user'):
user = cls.unicode(element.text, charset)
elif element.tag.endswith('proxyGrantingTicket'):
pgtiou = cls.unicode(element.text, charset)
elif element.tag.endswith('attributes'):
attributes = cls.parse_attributes_xml_element(element, charset)
return user, attributes, pgtiou
class CASClientV3(CASClientV2, SingleLogoutMixin):
"""CAS Client Version 3"""
url_suffix = 'serviceValidate'
logout_redirect_param_name = 'service'
@classmethod
def parse_attributes_xml_element(cls, element, charset):
attributes = dict()
for attribute in element:
tag = cls.unicode(attribute.tag, charset).split(u"}").pop()
if tag in attributes:
if isinstance(attributes[tag], list):
attributes[tag].append(cls.unicode(attribute.text, charset))
else:
attributes[tag] = [attributes[tag]]
attributes[tag].append(cls.unicode(attribute.text, charset))
else:
attributes[tag] = cls.unicode(attribute.text, charset)
return attributes
@classmethod
def verify_response(cls, response, charset):
return cls.parse_response_xml(response, charset)
SAML_1_0_NS = 'urn:oasis:names:tc:SAML:1.0:'
SAML_1_0_PROTOCOL_NS = '{' + SAML_1_0_NS + 'protocol' + '}'
SAML_1_0_ASSERTION_NS = '{' + SAML_1_0_NS + 'assertion' + '}'
SAML_ASSERTION_TEMPLATE = """<?xml version="1.0" encoding="UTF-8"?>
<SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/">
<SOAP-ENV:Header/>
<SOAP-ENV:Body>
<samlp:Request xmlns:samlp="urn:oasis:names:tc:SAML:1.0:protocol"
MajorVersion="1"
MinorVersion="1"
RequestID="{request_id}"
IssueInstant="{timestamp}">
<samlp:AssertionArtifact>{ticket}</samlp:AssertionArtifact></samlp:Request>
</SOAP-ENV:Body>
</SOAP-ENV:Envelope>"""
class CASClientWithSAMLV1(CASClientV2, SingleLogoutMixin):
"""CASClient 3.0+ with SAML"""
def verify_ticket(self, ticket, **kwargs):
"""Verifies CAS 3.0+ XML-based authentication ticket and returns extended attributes.
@date: 2011-11-30
@author: Carlos Gonzalez Vila <carlewis@gmail.com>
Returns username and attributes on success and None,None on failure.
"""
try:
from xml.etree import ElementTree
except ImportError:
from elementtree import ElementTree
page = self.fetch_saml_validation(ticket)
content_type = page.info().get('Content-type')
if "charset=" in content_type:
charset = content_type.split("charset=")[-1]
else:
charset = "ascii"
try:
user = None
attributes = {}
response = page.read()
tree = ElementTree.fromstring(response)
# Find the authentication status
success = tree.find('.//' + SAML_1_0_PROTOCOL_NS + 'StatusCode')
if success is not None and success.attrib['Value'].endswith(':Success'):
# User is validated
name_identifier = tree.find('.//' + SAML_1_0_ASSERTION_NS + 'NameIdentifier')
if name_identifier is not None:
user = self.unicode(name_identifier.text, charset)
attrs = tree.findall('.//' + SAML_1_0_ASSERTION_NS + 'Attribute')
for at in attrs:
if self.username_attribute in list(at.attrib.values()):
user = self.unicode(
at.find(SAML_1_0_ASSERTION_NS + 'AttributeValue').text,
charset
)
attributes[u'uid'] = user
values = at.findall(SAML_1_0_ASSERTION_NS + 'AttributeValue')
key = self.unicode(at.attrib['AttributeName'], charset)
if len(values) > 1:
values_array = []
for v in values:
values_array.append(self.unicode(v.text, charset))
attributes[key] = values_array
else:
attributes[key] = self.unicode(values[0].text, charset)
return user, attributes, None
finally:
page.close()
def fetch_saml_validation(self, ticket):
# We do the SAML validation
headers = {
'soapaction': 'http://www.oasis-open.org/committees/security',
'cache-control': 'no-cache',
'pragma': 'no-cache',
'accept': 'text/xml',
'connection': 'keep-alive',
'content-type': 'text/xml; charset=utf-8',
}
params = [('TARGET', self.service_url)]
saml_validate_url = urllib_parse.urljoin(
self.server_url, 'samlValidate',
)
request = Request(
saml_validate_url + '?' + urllib_parse.urlencode(params),
self.get_saml_assertion(ticket),
headers,
)
return urllib_request.urlopen(request)
@classmethod
def get_saml_assertion(cls, ticket):
"""
http://www.jasig.org/cas/protocol#samlvalidate-cas-3.0
SAML request values:
RequestID [REQUIRED]:
unique identifier for the request
IssueInstant [REQUIRED]:
timestamp of the request
samlp:AssertionArtifact [REQUIRED]:
the valid CAS Service Ticket obtained as a response parameter at login.
"""
# RequestID [REQUIRED] - unique identifier for the request
request_id = uuid4()
# e.g. 2014-06-02T09:21:03.071189
timestamp = datetime.datetime.now().isoformat()
return SAML_ASSERTION_TEMPLATE.format(
request_id=request_id,
timestamp=timestamp,
ticket=ticket,
).encode('utf8')

View File

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
@ -7,7 +8,7 @@
# along with this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# (c) 2015 Valentin Samir
# (c) 2015-2016 Valentin Samir
"""Default values for the app's settings"""
from django.conf import settings
from django.contrib.staticfiles.templatetags.staticfiles import static
@ -87,3 +88,9 @@ setting_default(
)
setting_default('CAS_ENABLE_AJAX_AUTH', False)
setting_default('CAS_FEDERATE', False)
setting_default('CAS_FEDERATE_REMEMBER_TIMEOUT', 604800) # one week
if settings.CAS_FEDERATE:
settings.CAS_AUTH_CLASS = "cas_server.auth.CASFederateAuth"

111
cas_server/federate.py Normal file
View File

@ -0,0 +1,111 @@
# -*- coding: utf-8 -*-
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
# more details.
#
# You should have received a copy of the GNU General Public License version 3
# along with this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# (c) 2016 Valentin Samir
"""federated mode helper classes"""
from .default_settings import settings
from django.db import IntegrityError
from .cas import CASClient
from .models import FederatedUser, FederateSLO, User
import logging
from importlib import import_module
from six.moves import urllib
SessionStore = import_module(settings.SESSION_ENGINE).SessionStore
logger = logging.getLogger(__name__)
class CASFederateValidateUser(object):
"""Class CAS client used to authenticate the user again a CAS provider"""
username = None
attributs = {}
client = None
def __init__(self, provider, service_url):
self.provider = provider
self.client = CASClient(
service_url=service_url,
version=provider.cas_protocol_version,
server_url=provider.server_url,
renew=False,
)
def get_login_url(self):
"""return the CAS provider login url"""
return self.client.get_login_url()
def get_logout_url(self, redirect_url=None):
"""return the CAS provider logout url"""
return self.client.get_logout_url(redirect_url)
def verify_ticket(self, ticket):
"""test `ticket` agains the CAS provider, if valid, create the local federated user"""
try:
username, attributs = self.client.verify_ticket(ticket)[:2]
except urllib.error.URLError:
return False
if username is not None:
if attributs is None:
attributs = {}
attributs["provider"] = self.provider
self.username = username
self.attributs = attributs
user = FederatedUser.objects.update_or_create(
username=username,
provider=self.provider,
defaults=dict(attributs=attributs, ticket=ticket)
)[0]
user.save()
self.federated_username = user.federated_username
return True
else:
return False
@staticmethod
def register_slo(username, session_key, ticket):
"""association a ticket with a (username, session) for processing later SLO request"""
try:
FederateSLO.objects.create(
username=username,
session_key=session_key,
ticket=ticket
)
except IntegrityError: # pragma: no cover (ignore if the FederateSLO already exists)
pass
def clean_sessions(self, logout_request):
"""process a SLO request"""
try:
slos = self.client.get_saml_slos(logout_request) or []
except NameError: # pragma: no cover (should not happen)
slos = []
for slo in slos:
for federate_slo in FederateSLO.objects.filter(ticket=slo.text):
logger.info(
"Got an SLO requests for ticket %s, logging out user %s" % (
federate_slo.username,
federate_slo.ticket
)
)
session = SessionStore(session_key=federate_slo.session_key)
session.flush()
try:
user = User.objects.get(
username=federate_slo.username,
session_key=federate_slo.session_key
)
user.logout()
user.delete()
except User.DoesNotExist: # pragma: no cover (should not happen)
pass
federate_slo.delete()

View File

@ -28,6 +28,27 @@ class WarnForm(forms.Form):
lt = forms.CharField(widget=forms.HiddenInput(), required=False)
class FederateSelect(forms.Form):
"""
Form used on the login page when CAS_FEDERATE is True
allowing the user to choose a identity provider.
"""
provider = forms.ModelChoiceField(
queryset=models.FederatedIendityProvider.objects.filter(display=True).order_by(
"pos",
"verbose_name",
"suffix"
),
to_field_name="suffix",
label=_('Identity provider'),
)
service = forms.CharField(label=_('service'), widget=forms.HiddenInput(), required=False)
method = forms.CharField(widget=forms.HiddenInput(), required=False)
remember = forms.BooleanField(label=_('Remember the identity provider'), required=False)
warn = forms.BooleanField(label=_('warn'), required=False)
renew = forms.BooleanField(widget=forms.HiddenInput(), required=False)
class UserCredential(forms.Form):
"""Form used on the login page to retrive user credentials"""
username = forms.CharField(label=_('login'))
@ -51,6 +72,32 @@ class UserCredential(forms.Form):
return cleaned_data
class FederateUserCredential(UserCredential):
"""Form used on the login page to retrive user credentials"""
username = forms.CharField(widget=forms.HiddenInput())
service = forms.CharField(widget=forms.HiddenInput(), required=False)
password = forms.CharField(widget=forms.HiddenInput())
ticket = forms.CharField(widget=forms.HiddenInput())
lt = forms.CharField(widget=forms.HiddenInput(), required=False)
method = forms.CharField(widget=forms.HiddenInput(), required=False)
warn = forms.BooleanField(widget=forms.HiddenInput(), required=False)
renew = forms.BooleanField(widget=forms.HiddenInput(), required=False)
def clean(self):
cleaned_data = super(FederateUserCredential, self).clean()
try:
user = models.FederatedUser.get_from_federated_username(cleaned_data["username"])
user.ticket = ""
user.save()
# should not happed as if the FederatedUser do not exists, super should
# raise before a ValidationError("bad user")
except models.FederatedUser.DoesNotExist: # pragma: no cover (should not happend)
raise forms.ValidationError(
_(u"User not found in the temporary database, please try to reconnect")
)
return cleaned_data
class TicketForm(forms.ModelForm):
"""Form for Tickets in the admin interface"""
class Meta:

View File

@ -7,86 +7,153 @@ msgid ""
msgstr ""
"Project-Id-Version: cas_server\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2016-05-03 23:50+0200\n"
"PO-Revision-Date: 2016-05-03 23:50+0200\n"
"POT-Creation-Date: 2016-07-04 17:36+0200\n"
"PO-Revision-Date: 2016-07-04 17:39+0200\n"
"Last-Translator: Valentin Samir <valentin.samir@crans.org>\n"
"Language-Team: django <LL@li.org>\n"
"Language: en\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"X-Generator: Poedit 1.8.7.1\n"
"X-Generator: Poedit 1.8.8\n"
#: apps.py:7 templates/cas_server/base.html:3
#: apps.py:19 templates/cas_server/base.html:3
#: templates/cas_server/base.html:20
msgid "Central Authentication Service"
msgstr "Central Authentication Service"
#: forms.py:23
msgid "login"
msgstr "username"
#: forms.py:43
msgid "Identity provider"
msgstr "Identity provider"
#: forms.py:24 forms.py:47
#: forms.py:45 forms.py:55 forms.py:106
msgid "service"
msgstr ""
#: forms.py:25
msgid "password"
msgstr "password"
#: forms.py:47
msgid "Remember the identity provider"
msgstr "Remember the identity provider"
#: forms.py:28
#: forms.py:48 forms.py:59
msgid "warn"
msgstr " Warn me before logging me into other sites."
#: forms.py:39
#: forms.py:54
msgid "login"
msgstr "username"
#: forms.py:56
msgid "password"
msgstr "password"
#: forms.py:71
msgid "Bad user"
msgstr "The credentials you provided cannot be determined to be authentic."
#: management/commands/cas_clean_sessions.py:9
#: forms.py:96
msgid "User not found in the temporary database, please try to reconnect"
msgstr ""
#: management/commands/cas_clean_federate.py:20
msgid "Clean old federated users"
msgstr "Clean old federated users"
#: management/commands/cas_clean_sessions.py:22
msgid "Clean deleted sessions"
msgstr "Clean deleted sessions"
#: management/commands/cas_clean_tickets.py:9
#: management/commands/cas_clean_tickets.py:22
msgid "Clean old trickets"
msgstr "Clean old trickets"
#: models.py:42
msgid "identity provider"
msgstr "identity provider"
#: models.py:43
msgid "identity providers"
msgstr "identity providers"
#: models.py:47
msgid "suffix"
msgstr ""
#: models.py:48
msgid ""
"Suffix append to backend CAS returner username: `returned_username`@`suffix`"
msgstr ""
#: models.py:50
msgid "server url"
msgstr ""
#: models.py:59
msgid "CAS protocol version"
msgstr ""
#: models.py:60
msgid ""
"Version of the CAS protocol to use when sending requests the the backend CAS"
msgstr ""
#: models.py:65
msgid "verbose name"
msgstr ""
#: models.py:66
msgid "Name for this identity provider displayed on the login page"
msgstr ""
#: models.py:70 models.py:317
msgid "position"
msgstr "position"
#: models.py:80
msgid "display"
msgstr ""
#: models.py:81
msgid "Display the provider on the login page"
msgstr ""
#: models.py:164
msgid "User"
msgstr ""
#: models.py:43
#: models.py:165
msgid "Users"
msgstr ""
#: models.py:101
#: models.py:234
#, python-format
msgid "Error during service logout %s"
msgstr "Error during service logout %s"
#: models.py:169
#: models.py:312
msgid "Service pattern"
msgstr "Service pattern"
#: models.py:170
#: models.py:313
msgid "Services patterns"
msgstr ""
#: models.py:174
msgid "position"
msgstr "position"
#: models.py:318
msgid "service patterns are sorted using the position attribute"
msgstr ""
#: models.py:181 models.py:303
#: models.py:325 models.py:449
msgid "name"
msgstr "name"
#: models.py:182
#: models.py:326
msgid "A name for the service"
msgstr "A name for the service"
#: models.py:187 models.py:331 models.py:349
#: models.py:331 models.py:478 models.py:497
msgid "pattern"
msgstr "pattern"
#: models.py:189
#: models.py:333
msgid ""
"A regular expression matching services. Will usually looks like '^https://"
"some\\.server\\.com/path/.*$'.As it is a regular expression, special "
@ -96,73 +163,73 @@ msgstr ""
"some\\.server\\.com/path/.*$'.As it is a regular expression, special "
"character must be escaped with a '\\'."
#: models.py:198
#: models.py:342
msgid "user field"
msgstr ""
#: models.py:199
#: models.py:343
msgid "Name of the attribut to transmit as username, empty = login"
msgstr "Name of the attribut to transmit as username, empty = login"
#: models.py:203
#: models.py:347
msgid "restrict username"
msgstr ""
#: models.py:204
#: models.py:348
msgid "Limit username allowed to connect to the list provided bellow"
msgstr "Limit username allowed to connect to the list provided bellow"
#: models.py:208
#: models.py:352
msgid "proxy"
msgstr "proxy"
#: models.py:209
#: models.py:353
msgid "Proxy tickets can be delivered to the service"
msgstr "Proxy tickets can be delivered to the service"
#: models.py:213
#: models.py:357
msgid "proxy callback"
msgstr "proxy callback"
#: models.py:214
#: models.py:358
msgid "can be used as a proxy callback to deliver PGT"
msgstr "can be used as a proxy callback to deliver PGT"
#: models.py:218
#: models.py:362
msgid "single log out"
msgstr ""
#: models.py:219
#: models.py:363
msgid "Enable SLO for the service"
msgstr "Enable SLO for the service"
#: models.py:226
#: models.py:370
msgid "single log out callback"
msgstr ""
#: models.py:227
#: models.py:371
msgid ""
"URL where the SLO request will be POST. empty = service url\n"
"This is usefull for non HTTP proxied services."
msgstr ""
#: models.py:288
#: models.py:433
msgid "username"
msgstr ""
#: models.py:289
#: models.py:434
msgid "username allowed to connect to the service"
msgstr "username allowed to connect to the service"
#: models.py:304
#: models.py:450
msgid "name of an attribut to send to the service, use * for all attributes"
msgstr "name of an attribut to send to the service, use * for all attributes"
#: models.py:309 models.py:355
#: models.py:455 models.py:503
msgid "replace"
msgstr "replace"
#: models.py:310
#: models.py:456
msgid ""
"name under which the attribut will be showto the service. empty = default "
"name of the attribut"
@ -170,39 +237,30 @@ msgstr ""
"name under which the attribut will be showto the service. empty = default "
"name of the attribut"
#: models.py:326 models.py:344
#: models.py:473 models.py:492
msgid "attribut"
msgstr "attribut"
#: models.py:327
#: models.py:474
msgid "Name of the attribut which must verify pattern"
msgstr "Name of the attribut which must verify pattern"
#: models.py:332
#: models.py:479
msgid "a regular expression"
msgstr "a regular expression"
#: models.py:345
#: models.py:493
msgid "Name of the attribut for which the value must be replace"
msgstr "Name of the attribut for which the value must be replace"
#: models.py:350
#: models.py:498
msgid "An regular expression maching whats need to be replaced"
msgstr "An regular expression maching whats need to be replaced"
#: models.py:356
#: models.py:504
msgid "replace expression, groups are capture by \\1, \\2 …"
msgstr "replace expression, groups are capture by \\1, \\2 …"
#: models.py:463
#, python-format
msgid ""
"Error during service logout %(service)s:\n"
"%(error)s"
msgstr ""
"Error during service logout %(service)s:\n"
"%(error)s"
#: templates/cas_server/logged.html:6
msgid "Logged"
msgstr ""
@ -219,19 +277,19 @@ msgstr "Log me out from all my sessions"
msgid "Logout"
msgstr "Logout"
#: templates/cas_server/login.html:7
#: templates/cas_server/login.html:8
msgid "Please log in"
msgstr "Please log in"
#: templates/cas_server/login.html:10
#: templates/cas_server/login.html:13
msgid "Login"
msgstr "Login"
#: templates/cas_server/warn.html:7
#: templates/cas_server/warn.html:10
msgid "Connect to the service"
msgstr "Connect to the service"
#: views.py:128
#: views.py:152
msgid ""
"<h3>Logout successful</h3>You have successfully logged out from the Central "
"Authentication Service. For security reasons, exit your web browser."
@ -239,7 +297,7 @@ msgstr ""
"<h3>Logout successful</h3>You have successfully logged out from the Central "
"Authentication Service. For security reasons, exit your web browser."
#: views.py:134
#: views.py:158
#, python-format
msgid ""
"<h3>Logout successful</h3>You have successfully logged out from %s sessions "
@ -250,7 +308,7 @@ msgstr ""
"of the Central Authentication Service. For security reasons, exit your web "
"browser."
#: views.py:141
#: views.py:165
msgid ""
"<h3>Logout successful</h3>You were already logged out from the Central "
"Authentication Service. For security reasons, exit your web browser."
@ -258,48 +316,55 @@ msgstr ""
"<h3>Logout successful</h3>You were already logged out from the Central "
"Authentication Service. For security reasons, exit your web browser."
#: views.py:230
#: views.py:349
msgid "Invalid login ticket"
msgstr "Invalid login ticket, please retry to login"
#: views.py:325
#: views.py:470
#, python-format
msgid "Authentication has been required by service %(name)s (%(url)s)"
msgstr "Authentication has been required by service %(name)s (%(url)s)"
#: views.py:359
#: views.py:508
#, python-format
msgid "Service %(url)s non allowed."
msgstr "Service %(url)s non allowed."
#: views.py:366
#: views.py:515
msgid "Username non allowed"
msgstr "Username non allowed"
#: views.py:373
#: views.py:522
msgid "User charateristics non allowed"
msgstr "User charateristics non allowed"
#: views.py:380
#: views.py:529
#, python-format
msgid "The attribut %(field)s is needed to use that service"
msgstr "The attribut %(field)s is needed to use that service"
#: views.py:450
#: views.py:599
#, python-format
msgid "Authentication renewal required by service %(name)s (%(url)s)."
msgstr "Authentication renewal required by service %(name)s (%(url)s)."
#: views.py:457
#: views.py:606
#, python-format
msgid "Authentication required by service %(name)s (%(url)s)."
msgstr "Authentication required by service %(name)s (%(url)s)."
#: views.py:464
#: views.py:613
#, python-format
msgid "Service %s non allowed"
msgstr "Service %s non allowed"
#~ msgid ""
#~ "Error during service logout %(service)s:\n"
#~ "%(error)s"
#~ msgstr ""
#~ "Error during service logout %(service)s:\n"
#~ "%(error)s"
#~ msgid "Successfully logout"
#~ msgstr ""
#~ "<h3>Logout successful</h3>You have successfully logged out of the Central "

View File

@ -7,8 +7,8 @@ msgid ""
msgstr ""
"Project-Id-Version: cas_server\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2016-05-03 23:47+0200\n"
"PO-Revision-Date: 2016-05-03 23:49+0200\n"
"POT-Creation-Date: 2016-07-04 17:36+0200\n"
"PO-Revision-Date: 2016-07-04 17:37+0200\n"
"Last-Translator: Valentin Samir <valentin.samir@crans.org>\n"
"Language-Team: django <LL@li.org>\n"
"Language: fr\n"
@ -16,78 +16,151 @@ msgstr ""
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Plural-Forms: nplurals=2; plural=(n > 1);\n"
"X-Generator: Poedit 1.8.7.1\n"
"X-Generator: Poedit 1.8.8\n"
#: apps.py:7 templates/cas_server/base.html:3
#: apps.py:19 templates/cas_server/base.html:3
#: templates/cas_server/base.html:20
msgid "Central Authentication Service"
msgstr "Service Central d'Authentification"
#: forms.py:23
msgid "login"
msgstr "Identifiant"
#: forms.py:43
msgid "Identity provider"
msgstr "fournisseur d'identité"
#: forms.py:24 forms.py:47
#: forms.py:45 forms.py:55 forms.py:106
msgid "service"
msgstr "service"
#: forms.py:25
msgid "password"
msgstr "mot de passe"
#: forms.py:47
msgid "Remember the identity provider"
msgstr "Se souvenir du fournisseur d'identité"
#: forms.py:28
#: forms.py:48 forms.py:59
msgid "warn"
msgstr "Prévenez-moi avant d'accéder à d'autres services."
#: forms.py:39
#: forms.py:54
msgid "login"
msgstr "Identifiant"
#: forms.py:56
msgid "password"
msgstr "mot de passe"
#: forms.py:71
msgid "Bad user"
msgstr "Les informations transmises n'ont pas permis de vous authentifier."
#: management/commands/cas_clean_sessions.py:9
#: forms.py:96
msgid "User not found in the temporary database, please try to reconnect"
msgstr ""
"Utilisateur non trouvé dans la base de donnée temporaire, essayez de vous "
"reconnecter"
#: management/commands/cas_clean_federate.py:20
msgid "Clean old federated users"
msgstr "Nettoyer les anciens utilisateurs fédéré"
#: management/commands/cas_clean_sessions.py:22
msgid "Clean deleted sessions"
msgstr "Nettoyer les sessions supprimées"
#: management/commands/cas_clean_tickets.py:9
#: management/commands/cas_clean_tickets.py:22
msgid "Clean old trickets"
msgstr "Nettoyer les vieux tickets"
#: models.py:42
msgid "identity provider"
msgstr "fournisseur d'identité"
#: models.py:43
msgid "identity providers"
msgstr "fournisseurs d'identités"
#: models.py:47
msgid "suffix"
msgstr "suffixe"
#: models.py:48
msgid ""
"Suffix append to backend CAS returner username: `returned_username`@`suffix`"
msgstr ""
"Suffixe ajouté au nom d'utilisateur retourné par le CAS du fournisseur "
"d'identité : `nom retourné`@`suffixe`"
#: models.py:50
msgid "server url"
msgstr "url du serveur"
#: models.py:59
msgid "CAS protocol version"
msgstr "Version du protocole CAS"
#: models.py:60
msgid ""
"Version of the CAS protocol to use when sending requests the the backend CAS"
msgstr ""
"Version du protocole CAS à utiliser lorsque l'on envoie des requête au CAS "
"du fournisseur d'identité"
#: models.py:65
msgid "verbose name"
msgstr "Nom du fournisseur"
#: models.py:66
msgid "Name for this identity provider displayed on the login page"
msgstr "Nom affiché pour ce fournisseur d'identité sur la page de connexion"
#: models.py:70 models.py:317
msgid "position"
msgstr "position"
#: models.py:80
msgid "display"
msgstr "afficher"
#: models.py:81
msgid "Display the provider on the login page"
msgstr "Afficher le fournisseur d'identité sur la page de connexion"
#: models.py:164
msgid "User"
msgstr "Utilisateur"
#: models.py:43
#: models.py:165
msgid "Users"
msgstr "Utilisateurs"
#: models.py:101
#: models.py:234
#, python-format
msgid "Error during service logout %s"
msgstr "Une erreur est survenue durant la déconnexion du service %s"
#: models.py:169
#: models.py:312
msgid "Service pattern"
msgstr "Motif de service"
#: models.py:170
#: models.py:313
msgid "Services patterns"
msgstr "Motifs de services"
#: models.py:174
msgid "position"
msgstr "position"
#: models.py:318
msgid "service patterns are sorted using the position attribute"
msgstr "Les motifs de service sont trié selon l'attribut position"
#: models.py:181 models.py:303
#: models.py:325 models.py:449
msgid "name"
msgstr "nom"
#: models.py:182
#: models.py:326
msgid "A name for the service"
msgstr "Un nom pour le service"
#: models.py:187 models.py:331 models.py:349
#: models.py:331 models.py:478 models.py:497
msgid "pattern"
msgstr "motif"
#: models.py:189
#: models.py:333
msgid ""
"A regular expression matching services. Will usually looks like '^https://"
"some\\.server\\.com/path/.*$'.As it is a regular expression, special "
@ -98,55 +171,55 @@ msgstr ""
"expression rationnelle, les caractères spéciaux doivent être échappés avec "
"un '\\'."
#: models.py:198
#: models.py:342
msgid "user field"
msgstr "champ utilisateur"
#: models.py:199
#: models.py:343
msgid "Name of the attribut to transmit as username, empty = login"
msgstr ""
"Nom de l'attribut devant être transmis comme nom d'utilisateur au service. "
"vide = nom de connection"
#: models.py:203
#: models.py:347
msgid "restrict username"
msgstr "limiter les noms d'utilisateurs"
#: models.py:204
#: models.py:348
msgid "Limit username allowed to connect to the list provided bellow"
msgstr ""
"Limiter les noms d'utilisateurs autorisé à se connecter à la liste fournie "
"ci-dessous"
#: models.py:208
#: models.py:352
msgid "proxy"
msgstr "proxy"
#: models.py:209
#: models.py:353
msgid "Proxy tickets can be delivered to the service"
msgstr "des proxy tickets peuvent être délivrés au service"
#: models.py:213
#: models.py:357
msgid "proxy callback"
msgstr ""
#: models.py:214
#: models.py:358
msgid "can be used as a proxy callback to deliver PGT"
msgstr "peut être utilisé comme un callback pour recevoir un PGT"
#: models.py:218
#: models.py:362
msgid "single log out"
msgstr ""
#: models.py:219
#: models.py:363
msgid "Enable SLO for the service"
msgstr "Active le SLO pour le service"
#: models.py:226
#: models.py:370
msgid "single log out callback"
msgstr ""
#: models.py:227
#: models.py:371
msgid ""
"URL where the SLO request will be POST. empty = service url\n"
"This is usefull for non HTTP proxied services."
@ -155,63 +228,54 @@ msgstr ""
"service\n"
"Ceci n'est utilise que pour des services non HTTP proxifiés"
#: models.py:288
#: models.py:433
msgid "username"
msgstr "nom d'utilisateur"
#: models.py:289
#: models.py:434
msgid "username allowed to connect to the service"
msgstr "noms d'utilisateurs autorisé à se connecter au service"
#: models.py:304
#: models.py:450
msgid "name of an attribut to send to the service, use * for all attributes"
msgstr ""
"nom d'un attribut a envoyer au service, utiliser * pour tous les attributs"
#: models.py:309 models.py:355
#: models.py:455 models.py:503
msgid "replace"
msgstr "remplacement"
#: models.py:310
#: models.py:456
msgid ""
"name under which the attribut will be showto the service. empty = default "
"name of the attribut"
msgstr ""
"nom sous lequel l'attribut sera rendu visible au service. vide = inchangé"
#: models.py:326 models.py:344
#: models.py:473 models.py:492
msgid "attribut"
msgstr "attribut"
#: models.py:327
#: models.py:474
msgid "Name of the attribut which must verify pattern"
msgstr "Nom de l'attribut devant vérifier un motif"
#: models.py:332
#: models.py:479
msgid "a regular expression"
msgstr "une expression régulière"
#: models.py:345
#: models.py:493
msgid "Name of the attribut for which the value must be replace"
msgstr "nom de l'attribue pour lequel la valeur doit être remplacé"
#: models.py:350
#: models.py:498
msgid "An regular expression maching whats need to be replaced"
msgstr "une expression régulière reconnaissant ce qui doit être remplacé"
#: models.py:356
#: models.py:504
msgid "replace expression, groups are capture by \\1, \\2 …"
msgstr "expression de remplacement, les groupe sont capturé par \\1, \\2"
#: models.py:463
#, python-format
msgid ""
"Error during service logout %(service)s:\n"
"%(error)s"
msgstr ""
"Une erreur est survenue durant la déconnexion du service %(service)s:"
"%(error)s"
#: templates/cas_server/logged.html:6
msgid "Logged"
msgstr ""
@ -228,19 +292,19 @@ msgstr "Me déconnecter de toutes mes sessions"
msgid "Logout"
msgstr "Se déconnecter"
#: templates/cas_server/login.html:7
#: templates/cas_server/login.html:8
msgid "Please log in"
msgstr "Merci de se connecter"
#: templates/cas_server/login.html:10
#: templates/cas_server/login.html:13
msgid "Login"
msgstr "Connexion"
#: templates/cas_server/warn.html:7
#: templates/cas_server/warn.html:10
msgid "Connect to the service"
msgstr "Se connecter au service"
#: views.py:128
#: views.py:152
msgid ""
"<h3>Logout successful</h3>You have successfully logged out from the Central "
"Authentication Service. For security reasons, exit your web browser."
@ -249,7 +313,7 @@ msgstr ""
"d'Authentification. Pour des raisons de sécurité, veuillez fermer votre "
"navigateur."
#: views.py:134
#: views.py:158
#, python-format
msgid ""
"<h3>Logout successful</h3>You have successfully logged out from %s sessions "
@ -260,7 +324,7 @@ msgstr ""
"Service Central d'Authentification. Pour des raisons de sécurité, veuillez "
"fermer votre navigateur."
#: views.py:141
#: views.py:165
msgid ""
"<h3>Logout successful</h3>You were already logged out from the Central "
"Authentication Service. For security reasons, exit your web browser."
@ -269,50 +333,57 @@ msgstr ""
"d'Authentification. Pour des raisons de sécurité, veuillez fermer votre "
"navigateur."
#: views.py:230
#: views.py:349
msgid "Invalid login ticket"
msgstr "Ticket de connexion invalide, merci de réessayé de vous connecter"
#: views.py:325
#: views.py:470
#, python-format
msgid "Authentication has been required by service %(name)s (%(url)s)"
msgstr ""
"Une demande d'authentification a été émise pour le service %(name)s "
"(%(url)s)."
#: views.py:359
#: views.py:508
#, python-format
msgid "Service %(url)s non allowed."
msgstr "le service %(url)s n'est pas autorisé."
#: views.py:366
#: views.py:515
msgid "Username non allowed"
msgstr "Nom d'utilisateur non authorisé"
#: views.py:373
#: views.py:522
msgid "User charateristics non allowed"
msgstr "Caractéristique utilisateur non autorisée"
#: views.py:380
#: views.py:529
#, python-format
msgid "The attribut %(field)s is needed to use that service"
msgstr "L'attribut %(field)s est nécessaire pour se connecter à ce service"
#: views.py:450
#: views.py:599
#, python-format
msgid "Authentication renewal required by service %(name)s (%(url)s)."
msgstr "Demande de réauthentification pour le service %(name)s (%(url)s)."
#: views.py:457
#: views.py:606
#, python-format
msgid "Authentication required by service %(name)s (%(url)s)."
msgstr "Authentification requise par le service %(name)s (%(url)s)."
#: views.py:464
#: views.py:613
#, python-format
msgid "Service %s non allowed"
msgstr "Le service %s n'est pas autorisé"
#~ msgid ""
#~ "Error during service logout %(service)s:\n"
#~ "%(error)s"
#~ msgstr ""
#~ "Une erreur est survenue durant la déconnexion du service %(service)s:"
#~ "%(error)s"
#~ msgid "Successfully logout"
#~ msgstr ""
#~ "<h3>Déconnexion réussie</h3>\n"

View File

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
# more details.
#
# You should have received a copy of the GNU General Public License version 3
# along with this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# (c) 2016 Valentin Samir
from django.core.management.base import BaseCommand
from django.utils.translation import ugettext_lazy as _
from ... import models
class Command(BaseCommand):
args = ''
help = _(u"Clean old federated users")
def handle(self, *args, **options):
models.FederatedUser.clean_old_entries()
models.FederateSLO.clean_deleted_sessions()

View File

@ -1,3 +1,14 @@
# -*- coding: utf-8 -*-
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
# more details.
#
# You should have received a copy of the GNU General Public License version 3
# along with this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# (c) 2016 Valentin Samir
"""Clean deleted sessions management command"""
from django.core.management.base import BaseCommand
from django.utils.translation import ugettext_lazy as _

View File

@ -1,3 +1,14 @@
# -*- coding: utf-8 -*-
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
# more details.
#
# You should have received a copy of the GNU General Public License version 3
# along with this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# (c) 2016 Valentin Samir
"""Clean old trickets management command"""
from django.core.management.base import BaseCommand
from django.utils.translation import ugettext_lazy as _

View File

@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.9.6 on 2016-06-16 10:18
from __future__ import unicode_literals
from django.db import migrations, models
import picklefield.fields
class Migration(migrations.Migration):
dependencies = [
('cas_server', '0004_auto_20151218_1032'),
]
operations = [
migrations.CreateModel(
name='FederatedUser',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('username', models.CharField(max_length=124)),
('provider', models.CharField(max_length=124)),
('attributs', picklefield.fields.PickledObjectField(editable=False)),
('ticket', models.CharField(max_length=255)),
('last_update', models.DateTimeField(auto_now=True)),
],
),
migrations.AlterUniqueTogether(
name='federateduser',
unique_together=set([('username', 'provider')]),
),
]

View File

@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.9.7 on 2016-06-23 15:16
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('cas_server', '0005_auto_20160616_1018'),
]
operations = [
migrations.CreateModel(
name='FederateSLO',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('username', models.CharField(max_length=30)),
('session_key', models.CharField(blank=True, max_length=40, null=True)),
('ticket', models.CharField(max_length=255)),
],
),
migrations.AlterUniqueTogether(
name='federateslo',
unique_together=set([('username', 'session_key')]),
),
]

View File

@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.9.7 on 2016-07-04 15:10
from __future__ import unicode_literals
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('cas_server', '0006_auto_20160623_1516'),
]
operations = [
migrations.CreateModel(
name='FederatedIendityProvider',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('suffix', models.CharField(help_text='Suffix append to backend CAS returner username: `returned_username`@`suffix`', max_length=30, unique=True, verbose_name='suffix')),
('server_url', models.CharField(max_length=255, verbose_name='server url')),
('cas_protocol_version', models.CharField(choices=[(b'1', b'CAS 1.0'), (b'2', b'CAS 2.0'), (b'3', b'CAS 3.0'), (b'CAS_2_SAML_1_0', b'SAML 1.1')], default=b'3', help_text='Version of the CAS protocol to use when sending requests the the backend CAS', max_length=30, verbose_name='CAS protocol version')),
('verbose_name', models.CharField(help_text='Name for this identity provider displayed on the login page', max_length=255, verbose_name='verbose name')),
('pos', models.IntegerField(default=100, help_text='Identity provider are sorted using the (position, verbose name, suffix) attributes', verbose_name='position')),
],
options={
'verbose_name': 'identity provider',
'verbose_name_plural': 'identity providers',
},
),
migrations.AlterField(
model_name='federateduser',
name='provider',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='cas_server.FederatedIendityProvider'),
),
migrations.AlterField(
model_name='federateslo',
name='ticket',
field=models.CharField(db_index=True, max_length=255),
),
migrations.AlterField(
model_name='servicepattern',
name='pos',
field=models.IntegerField(default=100, help_text='service patterns are sorted using the position attribute', verbose_name='position'),
),
migrations.AlterUniqueTogether(
name='federateslo',
unique_together=set([('username', 'session_key', 'ticket')]),
),
]

View File

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.9.7 on 2016-07-04 15:33
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('cas_server', '0007_auto_20160704_1510'),
]
operations = [
migrations.AddField(
model_name='federatediendityprovider',
name='display',
field=models.BooleanField(default=True, help_text='Display the provider on the login page', verbose_name='display'),
),
]

View File

@ -1,4 +1,4 @@
# *- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
@ -17,6 +17,7 @@ from django.db.models import Q
from django.contrib import messages
from django.utils.translation import ugettext_lazy as _
from django.utils import timezone
from django.utils.encoding import python_2_unicode_compatible
from picklefield.fields import PickledObjectField
import re
@ -34,6 +35,128 @@ SessionStore = import_module(settings.SESSION_ENGINE).SessionStore
logger = logging.getLogger(__name__)
@python_2_unicode_compatible
class FederatedIendityProvider(models.Model):
"""An identity provider for the federated mode"""
class Meta:
verbose_name = _("identity provider")
verbose_name_plural = _("identity providers")
suffix = models.CharField(
max_length=30,
unique=True,
verbose_name=_(u"suffix"),
help_text=_("Suffix append to backend CAS returner username: `returned_username`@`suffix`")
)
server_url = models.CharField(max_length=255, verbose_name=_(u"server url"))
cas_protocol_version = models.CharField(
max_length=30,
choices=[
("1", "CAS 1.0"),
("2", "CAS 2.0"),
("3", "CAS 3.0"),
("CAS_2_SAML_1_0", "SAML 1.1")
],
verbose_name=_(u"CAS protocol version"),
help_text=_("Version of the CAS protocol to use when sending requests the the backend CAS"),
default="3"
)
verbose_name = models.CharField(
max_length=255,
verbose_name=_(u"verbose name"),
help_text=_("Name for this identity provider displayed on the login page")
)
pos = models.IntegerField(
default=100,
verbose_name=_(u"position"),
help_text=_(
(
u"Identity provider are sorted using the "
u"(position, verbose name, suffix) attributes"
)
)
)
display = models.BooleanField(
default=True,
verbose_name=_(u"display"),
help_text=_("Display the provider on the login page")
)
def __str__(self):
return self.verbose_name
@staticmethod
def build_username_from_suffix(username, suffix):
"""Transform backend username into federated username using `suffix`"""
return u'%s@%s' % (username, suffix)
def build_username(self, username):
"""Transform backend username into federated username"""
return u'%s@%s' % (username, self.suffix)
@python_2_unicode_compatible
class FederatedUser(models.Model):
"""A federated user as returner by a CAS provider (username and attributes)"""
class Meta:
unique_together = ("username", "provider")
username = models.CharField(max_length=124)
provider = models.ForeignKey(FederatedIendityProvider, on_delete=models.CASCADE)
attributs = PickledObjectField()
ticket = models.CharField(max_length=255)
last_update = models.DateTimeField(auto_now=True)
def __str__(self):
return self.federated_username
@property
def federated_username(self):
"""return the federated username with a suffix"""
return self.provider.build_username(self.username)
@classmethod
def get_from_federated_username(cls, username):
"""return a FederatedUser object from a federated username"""
if username is None:
raise cls.DoesNotExist()
else:
component = username.split('@')
username = '@'.join(component[:-1])
suffix = component[-1]
try:
provider = FederatedIendityProvider.objects.get(suffix=suffix)
return cls.objects.get(username=username, provider=provider)
except FederatedIendityProvider.DoesNotExist:
raise cls.DoesNotExist()
@classmethod
def clean_old_entries(cls):
"""remove old unused federated users"""
federated_users = cls.objects.filter(
last_update__lt=(timezone.now() - timedelta(seconds=settings.CAS_TICKET_TIMEOUT))
)
known_users = {user.username for user in User.objects.all()}
for user in federated_users:
if user.federated_username not in known_users:
user.delete()
class FederateSLO(models.Model):
"""An association between a CAS provider ticket and a (username, session) for processing SLO"""
class Meta:
unique_together = ("username", "session_key", "ticket")
username = models.CharField(max_length=30)
session_key = models.CharField(max_length=40, blank=True, null=True)
ticket = models.CharField(max_length=255, db_index=True)
@classmethod
def clean_deleted_sessions(cls):
"""remove old object for which the session do not exists anymore"""
for federate_slo in cls.objects.all():
if not SessionStore(session_key=federate_slo.session_key).get('authenticated'):
federate_slo.delete()
@python_2_unicode_compatible
class User(models.Model):
"""A user logged into the CAS"""
class Meta:
@ -44,6 +167,15 @@ class User(models.Model):
username = models.CharField(max_length=30)
date = models.DateTimeField(auto_now=True)
def delete(self, *args, **kwargs):
"""remove the User"""
if settings.CAS_FEDERATE:
FederateSLO.objects.filter(
username=self.username,
session_key=self.session_key
).delete()
super(User, self).delete(*args, **kwargs)
@classmethod
def clean_old_entries(cls):
"""Remove users inactive since more that SESSION_COOKIE_AGE"""
@ -67,7 +199,7 @@ class User(models.Model):
"""return a fresh dict for the user attributs"""
return utils.import_attr(settings.CAS_AUTH_CLASS)(self.username).attributs()
def __unicode__(self):
def __str__(self):
return u"%s - %s" % (self.username, self.session_key)
def logout(self, request=None):
@ -172,6 +304,7 @@ class UserFieldNotDefined(ServicePatternException):
pass
@python_2_unicode_compatible
class ServicePattern(models.Model):
"""Allowed services pattern agains services are tested to"""
class Meta:
@ -181,7 +314,8 @@ class ServicePattern(models.Model):
pos = models.IntegerField(
default=100,
verbose_name=_(u"position")
verbose_name=_(u"position"),
help_text=_(u"service patterns are sorted using the position attribute")
)
name = models.CharField(
max_length=255,
@ -238,7 +372,7 @@ class ServicePattern(models.Model):
u"This is usefull for non HTTP proxied services.")
)
def __unicode__(self):
def __str__(self):
return u"%s: %s" % (self.pos, self.pattern)
def check_user(self, user):
@ -291,6 +425,7 @@ class ServicePattern(models.Model):
raise cls.DoesNotExist()
@python_2_unicode_compatible
class Username(models.Model):
"""A list of allowed usernames on a service pattern"""
value = models.CharField(
@ -300,10 +435,11 @@ class Username(models.Model):
)
service_pattern = models.ForeignKey(ServicePattern, related_name="usernames")
def __unicode__(self):
def __str__(self):
return self.value
@python_2_unicode_compatible
class ReplaceAttributName(models.Model):
"""A list of replacement of attributs name for a service pattern"""
class Meta:
@ -322,13 +458,14 @@ class ReplaceAttributName(models.Model):
)
service_pattern = models.ForeignKey(ServicePattern, related_name="attributs")
def __unicode__(self):
def __str__(self):
if not self.replace:
return self.name
else:
return u"%s%s" % (self.name, self.replace)
@python_2_unicode_compatible
class FilterAttributValue(models.Model):
"""A list of filter on attributs for a service pattern"""
attribut = models.CharField(
@ -343,10 +480,11 @@ class FilterAttributValue(models.Model):
)
service_pattern = models.ForeignKey(ServicePattern, related_name="filters")
def __unicode__(self):
def __str__(self):
return u"%s %s" % (self.attribut, self.pattern)
@python_2_unicode_compatible
class ReplaceAttributValue(models.Model):
"""Replacement to apply on attributs values for a service pattern"""
attribut = models.CharField(
@ -367,10 +505,11 @@ class ReplaceAttributValue(models.Model):
)
service_pattern = models.ForeignKey(ServicePattern, related_name="replacements")
def __unicode__(self):
def __str__(self):
return u"%s %s %s" % (self.attribut, self.pattern, self.replace)
@python_2_unicode_compatible
class Ticket(models.Model):
"""Generic class for a Ticket"""
class Meta:
@ -387,7 +526,7 @@ class Ticket(models.Model):
VALIDITY = settings.CAS_TICKET_VALIDITY
TIMEOUT = settings.CAS_TICKET_TIMEOUT
def __unicode__(self):
def __str__(self):
return u"Ticket-%s" % self.pk
@classmethod
@ -457,34 +596,38 @@ class Ticket(models.Model):
)
@python_2_unicode_compatible
class ServiceTicket(Ticket):
"""A Service Ticket"""
PREFIX = settings.CAS_SERVICE_TICKET_PREFIX
value = models.CharField(max_length=255, default=utils.gen_st, unique=True)
def __unicode__(self):
def __str__(self):
return u"ServiceTicket-%s" % self.pk
@python_2_unicode_compatible
class ProxyTicket(Ticket):
"""A Proxy Ticket"""
PREFIX = settings.CAS_PROXY_TICKET_PREFIX
value = models.CharField(max_length=255, default=utils.gen_pt, unique=True)
def __unicode__(self):
def __str__(self):
return u"ProxyTicket-%s" % self.pk
@python_2_unicode_compatible
class ProxyGrantingTicket(Ticket):
"""A Proxy Granting Ticket"""
PREFIX = settings.CAS_PROXY_GRANTING_TICKET_PREFIX
VALIDITY = settings.CAS_PGT_VALIDITY
value = models.CharField(max_length=255, default=utils.gen_pgt, unique=True)
def __unicode__(self):
def __str__(self):
return u"ProxyGrantingTicket-%s" % self.pk
@python_2_unicode_compatible
class Proxy(models.Model):
"""A list of proxies on `ProxyTicket`"""
class Meta:
@ -492,5 +635,5 @@ class Proxy(models.Model):
url = models.CharField(max_length=255)
proxy_ticket = models.ForeignKey(ProxyTicket, related_name="proxies")
def __unicode__(self):
def __str__(self):
return self.url

View File

@ -12,6 +12,7 @@
{% block bootstrap3_content %}
<div class="container">
{% if auto_submit %}<noscript>{% endif %}
<div class="row">
<div class="col-lg-12 col-md-12 col-sm-12 col-xs-12">
<h1 id="app-name">
@ -19,10 +20,13 @@
{% trans "Central Authentication Service" %}</h1>
</div>
</div>
{% if auto_submit %}</noscript>{% endif %}
<div class="row">
<div class="col-lg-3 col-md-3 col-sm-2 col-xs-12"></div>
<div class="col-lg-6 col-md-6 col-sm-8 col-xs-12">
{% if auto_submit %}<noscript>{% endif %}
{% bootstrap_messages %}
{% if auto_submit %}</noscript>{% endif %}
{% block content %}
{% endblock %}
</div>

View File

@ -3,11 +3,20 @@
{% load staticfiles %}
{% load i18n %}
{% block content %}
<form class="form-signin" method="post">
<form class="form-signin" method="post" id="login_form"{% if post_url %} action="{{post_url}}"{% endif %}>
{% if auto_submit %}<noscript>{% endif %}
<h2 class="form-signin-heading">{% trans "Please log in" %}</h2>
{% if auto_submit %}</noscript>{% endif %}
{% csrf_token %}
{% bootstrap_form form %}
{% if auto_submit %}<noscript>{% endif %}
{% bootstrap_button _('Login') size='lg' button_type="submit" button_class="btn-primary btn-block"%}
{% if auto_submit %}</noscript>{% endif %}
</form>
{% if auto_submit %}
<script type="text/javascript">
document.getElementById('login_form').submit(); // SUBMIT FORM
</script>
{% endif %}
{% endblock %}

View File

@ -1,4 +1,4 @@
# *- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
@ -23,27 +23,28 @@ from cas_server.tests.utils import get_auth_client
class BaseServicePattern(object):
"""Mixing for setting up service pattern for testing"""
def setup_service_patterns(self, proxy=False):
@classmethod
def setup_service_patterns(cls, proxy=False):
"""setting up service pattern"""
# For general purpose testing
self.service = "https://www.example.com"
self.service_pattern = models.ServicePattern.objects.create(
cls.service = "https://www.example.com"
cls.service_pattern = models.ServicePattern.objects.create(
name="example",
pattern="^https://www\.example\.com(/.*)?$",
proxy=proxy,
)
models.ReplaceAttributName.objects.create(name="*", service_pattern=self.service_pattern)
models.ReplaceAttributName.objects.create(name="*", service_pattern=cls.service_pattern)
# For testing the restrict_users attributes
self.service_restrict_user_fail = "https://restrict_user_fail.example.com"
self.service_pattern_restrict_user_fail = models.ServicePattern.objects.create(
cls.service_restrict_user_fail = "https://restrict_user_fail.example.com"
cls.service_pattern_restrict_user_fail = models.ServicePattern.objects.create(
name="restrict_user_fail",
pattern="^https://restrict_user_fail\.example\.com(/.*)?$",
restrict_users=True,
proxy=proxy,
)
self.service_restrict_user_success = "https://restrict_user_success.example.com"
self.service_pattern_restrict_user_success = models.ServicePattern.objects.create(
cls.service_restrict_user_success = "https://restrict_user_success.example.com"
cls.service_pattern_restrict_user_success = models.ServicePattern.objects.create(
name="restrict_user_success",
pattern="^https://restrict_user_success\.example\.com(/.*)?$",
restrict_users=True,
@ -51,12 +52,12 @@ class BaseServicePattern(object):
)
models.Username.objects.create(
value=settings.CAS_TEST_USER,
service_pattern=self.service_pattern_restrict_user_success
service_pattern=cls.service_pattern_restrict_user_success
)
# For testing the user attributes filtering conditions
self.service_filter_fail = "https://filter_fail.example.com"
self.service_pattern_filter_fail = models.ServicePattern.objects.create(
cls.service_filter_fail = "https://filter_fail.example.com"
cls.service_pattern_filter_fail = models.ServicePattern.objects.create(
name="filter_fail",
pattern="^https://filter_fail\.example\.com(/.*)?$",
proxy=proxy,
@ -64,10 +65,10 @@ class BaseServicePattern(object):
models.FilterAttributValue.objects.create(
attribut="right",
pattern="^admin$",
service_pattern=self.service_pattern_filter_fail
service_pattern=cls.service_pattern_filter_fail
)
self.service_filter_fail_alt = "https://filter_fail_alt.example.com"
self.service_pattern_filter_fail_alt = models.ServicePattern.objects.create(
cls.service_filter_fail_alt = "https://filter_fail_alt.example.com"
cls.service_pattern_filter_fail_alt = models.ServicePattern.objects.create(
name="filter_fail_alt",
pattern="^https://filter_fail_alt\.example\.com(/.*)?$",
proxy=proxy,
@ -75,10 +76,10 @@ class BaseServicePattern(object):
models.FilterAttributValue.objects.create(
attribut="nom",
pattern="^toto$",
service_pattern=self.service_pattern_filter_fail_alt
service_pattern=cls.service_pattern_filter_fail_alt
)
self.service_filter_success = "https://filter_success.example.com"
self.service_pattern_filter_success = models.ServicePattern.objects.create(
cls.service_filter_success = "https://filter_success.example.com"
cls.service_pattern_filter_success = models.ServicePattern.objects.create(
name="filter_success",
pattern="^https://filter_success\.example\.com(/.*)?$",
proxy=proxy,
@ -86,26 +87,26 @@ class BaseServicePattern(object):
models.FilterAttributValue.objects.create(
attribut="email",
pattern="^%s$" % re.escape(settings.CAS_TEST_ATTRIBUTES['email']),
service_pattern=self.service_pattern_filter_success
service_pattern=cls.service_pattern_filter_success
)
# For testing the user_field attributes
self.service_field_needed_fail = "https://field_needed_fail.example.com"
self.service_pattern_field_needed_fail = models.ServicePattern.objects.create(
cls.service_field_needed_fail = "https://field_needed_fail.example.com"
cls.service_pattern_field_needed_fail = models.ServicePattern.objects.create(
name="field_needed_fail",
pattern="^https://field_needed_fail\.example\.com(/.*)?$",
user_field="uid",
proxy=proxy,
)
self.service_field_needed_success = "https://field_needed_success.example.com"
self.service_pattern_field_needed_success = models.ServicePattern.objects.create(
cls.service_field_needed_success = "https://field_needed_success.example.com"
cls.service_pattern_field_needed_success = models.ServicePattern.objects.create(
name="field_needed_success",
pattern="^https://field_needed_success\.example\.com(/.*)?$",
user_field="alias",
proxy=proxy,
)
self.service_field_needed_success_alt = "https://field_needed_success_alt.example.com"
self.service_pattern_field_needed_success = models.ServicePattern.objects.create(
cls.service_field_needed_success_alt = "https://field_needed_success_alt.example.com"
cls.service_pattern_field_needed_success = models.ServicePattern.objects.create(
name="field_needed_success_alt",
pattern="^https://field_needed_success_alt\.example\.com(/.*)?$",
user_field="nom",
@ -191,3 +192,65 @@ class UserModels(object):
username=settings.CAS_TEST_USER,
session_key=client.session.session_key
)
class CanLogin(object):
"""Assertion about login"""
def assert_logged(
self, client, response, warn=False,
code=200, username=settings.CAS_TEST_USER
):
"""Assertions testing that client is well authenticated"""
self.assertEqual(response.status_code, code)
# this message is displayed to the user upon successful authentication
self.assertIn(
(
b"You have successfully logged into "
b"the Central Authentication Service"
),
response.content
)
# these session variables a set if usccessfully authenticated
self.assertEqual(client.session["username"], username)
self.assertIs(client.session["warn"], warn)
self.assertIs(client.session["authenticated"], True)
# on successfull authentication, a corresponding user object is created
self.assertTrue(
models.User.objects.get(
username=username,
session_key=client.session.session_key
)
)
def assert_login_failed(self, client, response, code=200):
"""Assertions testing a failed login attempt"""
self.assertEqual(response.status_code, code)
# this message is displayed to the user upon successful authentication, so it should not
# appear
self.assertNotIn(
(
b"You have successfully logged into "
b"the Central Authentication Service"
),
response.content
)
# if authentication has failed, these session variables should not be set
self.assertTrue(client.session.get("username") is None)
self.assertTrue(client.session.get("warn") is None)
self.assertTrue(client.session.get("authenticated") is None)
class FederatedIendityProviderModel(object):
"""Mixin for test classes using the FederatedIendityProvider model"""
@staticmethod
def setup_federated_identity_provider(providers):
"""setting up federated identity providers"""
for suffix, (server_url, cas_protocol_version, verbose_name) in providers.items():
models.FederatedIendityProvider.objects.create(
suffix=suffix,
server_url=server_url,
cas_protocol_version=cas_protocol_version,
verbose_name=verbose_name
)

View File

@ -0,0 +1,360 @@
# -*- coding: utf-8 -*-
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
# more details.
#
# You should have received a copy of the GNU General Public License version 3
# along with this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# (c) 2016 Valentin Samir
"""tests for the CAS federate mode"""
from cas_server import default_settings
from cas_server.default_settings import settings
import django
from django.test import TestCase, Client
from django.test.utils import override_settings
from six.moves import reload_module
from cas_server import utils, models
from cas_server.tests.mixin import BaseServicePattern, CanLogin, FederatedIendityProviderModel
from cas_server.tests import utils as tests_utils
PROVIDERS = {
"example.com": ("http://127.0.0.1:8080", '1', "Example dot com"),
"example.org": ("http://127.0.0.1:8081", '2', "Example dot org"),
"example.net": ("http://127.0.0.1:8082", '3', "Example dot net"),
"example.test": ("http://127.0.0.1:8083", 'CAS_2_SAML_1_0', 'Example fot test'),
}
@override_settings(
CAS_FEDERATE=True,
CAS_AUTH_CLASS="cas_server.auth.CASFederateAuth",
# test with a non ascii username
CAS_TEST_USER=u"dédé"
)
class FederateAuthLoginLogoutTestCase(
TestCase, BaseServicePattern, CanLogin, FederatedIendityProviderModel
):
"""tests for the views login logout and federate then the federated mode is enabled"""
def setUp(self):
"""Prepare the test context"""
self.setup_service_patterns()
self.setup_federated_identity_provider(PROVIDERS)
def test_default_settings(self):
"""default settings should populated some default variable then CAS_FEDERATE is True"""
del settings.CAS_AUTH_CLASS
reload_module(default_settings)
self.assertEqual(settings.CAS_AUTH_CLASS, "cas_server.auth.CASFederateAuth")
def test_login_get_provider(self):
"""some assertion about the login page in federated mode"""
client = Client()
response = client.get("/login")
self.assertEqual(response.status_code, 200)
for provider in models.FederatedIendityProvider.objects.all():
self.assertTrue('<option value="%s">%s</option>' % (
provider.suffix,
provider.verbose_name
) in response.content.decode("utf-8"))
self.assertEqual(response.context['post_url'], '/federate')
def test_login_post_provider(self, remember=False):
"""test a successful login wrokflow"""
tickets = []
# choose the example.com provider
for (suffix, cas_port) in [
("example.com", 8080), ("example.org", 8081),
("example.net", 8082), ("example.test", 8083)
]:
provider = models.FederatedIendityProvider.objects.get(suffix=suffix)
# get a bare client
client = Client()
# fetch the login page
response = client.get("/login")
# in federated mode, we shoudl POST do /federate on the login page
self.assertEqual(response.context['post_url'], '/federate')
# get current form parameter
params = tests_utils.copy_form(response.context["form"])
params['provider'] = provider.suffix
if remember:
params['remember'] = 'on'
# post the choosed provider
response = client.post('/federate', params)
# we are redirected to the provider CAS client url
self.assertEqual(response.status_code, 302)
if remember:
self.assertEqual(response["Location"], '%s/federate/%s?remember=on' % (
'http://testserver' if django.VERSION < (1, 9) else "",
provider.suffix
))
else:
self.assertEqual(response["Location"], '%s/federate/%s' % (
'http://testserver' if django.VERSION < (1, 9) else "",
provider.suffix
))
# let's follow the redirect
response = client.get('/federate/%s' % provider.suffix)
# we are redirected to the provider CAS for authentication
self.assertEqual(response.status_code, 302)
self.assertEqual(
response["Location"],
"%s/login?service=http%%3A%%2F%%2Ftestserver%%2Ffederate%%2F%s" % (
provider.server_url,
provider.suffix
)
)
# let's generate a ticket
ticket = utils.gen_st()
# we lauch a dummy CAS server that only validate once for the service
# http://testserver/federate/example.com with `ticket`
tests_utils.DummyCAS.run(
("http://testserver/federate/%s" % provider.suffix).encode("ascii"),
ticket.encode("ascii"),
settings.CAS_TEST_USER.encode("utf8"),
[],
cas_port
)
# we normally provide a good ticket and should be redirected to /login as the ticket
# get successfully validated again the dummy CAS
response = client.get('/federate/%s' % provider.suffix, {'ticket': ticket})
self.assertEqual(response.status_code, 302)
self.assertEqual(response["Location"], "%s/login" % (
'http://testserver' if django.VERSION < (1, 9) else ""
))
# follow the redirect
response = client.get("/login")
# we should get a page with a from with all widget hidden that auto POST to /login using
# javascript. If javascript is disabled, a "connect" button is showed
self.assertTrue(response.context['auto_submit'])
self.assertEqual(response.context['post_url'], '/login')
params = tests_utils.copy_form(response.context["form"])
# POST ge prefiled from parameters
response = client.post("/login", params)
# the user should now being authenticated using username test@`provider`
self.assert_logged(
client, response, username=provider.build_username(settings.CAS_TEST_USER)
)
tickets.append((provider, ticket, client))
# try to get a ticket
response = client.get("/login", {'service': self.service})
self.assertEqual(response.status_code, 302)
self.assertTrue(response["Location"].startswith("%s?ticket=" % self.service))
return tickets
def test_login_twice(self):
"""Test that user id db is used for the second login (cf coverage)"""
self.test_login_post_provider()
self.test_login_post_provider()
@override_settings(CAS_FEDERATE=False)
def test_auth_federate_false(self):
"""federated view should redirect to /login then CAS_FEDERATE is False"""
provider = "example.com"
client = Client()
response = client.get("/federate/%s" % provider)
self.assertEqual(response.status_code, 302)
self.assertEqual(response["Location"], "%s/login" % (
'http://testserver' if django.VERSION < (1, 9) else ""
))
response = client.post("%s/federate/%s" % (
'http://testserver' if django.VERSION < (1, 9) else "",
provider
))
self.assertEqual(response.status_code, 302)
self.assertEqual(response["Location"], "%s/login" % (
'http://testserver' if django.VERSION < (1, 9) else ""
))
def test_auth_federate_errors(self):
"""
The federated view should redirect to /login if the provider is unknown or not provided,
try to fetch a new ticket if the provided ticket validation fail
(network error or bad ticket)
"""
good_provider = "example.com"
bad_provider = "exemple.fr"
client = Client()
response = client.get("/federate/%s" % bad_provider)
self.assertEqual(response.status_code, 302)
self.assertEqual(response["Location"], "%s/login" % (
'http://testserver' if django.VERSION < (1, 9) else ""
))
# test CAS not avaible
response = client.get("/federate/%s" % good_provider, {'ticket': utils.gen_st()})
self.assertEqual(response.status_code, 302)
self.assertEqual(
response["Location"],
"%s/login?service=http%%3A%%2F%%2Ftestserver%%2Ffederate%%2F%s" % (
models.FederatedIendityProvider.objects.get(suffix=good_provider).server_url,
good_provider
)
)
# test CAS avaible but bad ticket
tests_utils.DummyCAS.run(
("http://testserver/federate/%s" % good_provider).encode("ascii"),
utils.gen_st().encode("ascii"),
settings.CAS_TEST_USER.encode("utf-8"),
[],
8080
)
response = client.get("/federate/%s" % good_provider, {'ticket': utils.gen_st()})
self.assertEqual(response.status_code, 302)
self.assertEqual(
response["Location"],
"%s/login?service=http%%3A%%2F%%2Ftestserver%%2Ffederate%%2F%s" % (
models.FederatedIendityProvider.objects.get(suffix=good_provider).server_url,
good_provider
)
)
response = client.post("/federate")
self.assertEqual(response.status_code, 302)
self.assertEqual(response["Location"], "%s/login" % (
'http://testserver' if django.VERSION < (1, 9) else ""
))
def test_auth_federate_slo(self):
"""test that SLO receive from backend CAS log out the users"""
# get tickets and connected clients
tickets = self.test_login_post_provider()
for (provider, ticket, client) in tickets:
# SLO for an unkown ticket should do nothing
response = client.post(
"/federate/%s" % provider.suffix,
{'logoutRequest': tests_utils.logout_request(utils.gen_st())}
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, b"ok")
# Bad SLO format should do nothing
response = client.post(
"/federate/%s" % provider.suffix,
{'logoutRequest': ""}
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, b"ok")
# Bad SLO format should do nothing
response = client.post(
"/federate/%s" % provider.suffix,
{'logoutRequest': "<root></root>"}
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, b"ok")
response = client.get("/login")
self.assert_logged(
client, response, username=provider.build_username(settings.CAS_TEST_USER)
)
# SLO for a previously logged ticket should log out the user if CAS version is
# 3 or 'CAS_2_SAML_1_0'
response = client.post(
"/federate/%s" % provider.suffix,
{'logoutRequest': tests_utils.logout_request(ticket)}
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, b"ok")
response = client.get("/login")
if provider.cas_protocol_version in {'3', 'CAS_2_SAML_1_0'}: # support SLO
self.assert_login_failed(client, response)
else:
self.assert_logged(
client, response, username=provider.build_username(settings.CAS_TEST_USER)
)
def test_federate_logout(self):
"""
test the logout function: the user should be log out
and redirected to his CAS logout page
"""
# get tickets and connected clients, then follow normal logout
tickets = self.test_login_post_provider()
for (provider, _, client) in tickets:
response = client.get("/logout")
self.assertEqual(response.status_code, 302)
self.assertEqual(
response["Location"],
"%s/logout" % provider.server_url,
)
response = client.get("/login")
self.assert_login_failed(client, response)
# test if the user is already logged out
response = client.get("/logout")
# no redirection
self.assertEqual(response.status_code, 200)
self.assertTrue(
(
b"You were already logged out from the Central Authentication Service."
) in response.content
)
tickets = self.test_login_post_provider()
if django.VERSION >= (1, 8):
# assume the username session variable has been tempered (should not happend)
for (provider, _, client) in tickets:
session = client.session
session["username"] = settings.CAS_TEST_USER
session.save()
response = client.get("/logout")
self.assertEqual(response.status_code, 200)
response = client.get("/login")
self.assert_login_failed(client, response)
def test_remember_provider(self):
"""
If the user check remember, next login should not offer the chose of the backend CAS
and use the one store in the cookie
"""
tickets = self.test_login_post_provider(remember=True)
for (provider, _, client) in tickets:
client.get("/logout")
response = client.get("/login")
self.assertEqual(response.status_code, 302)
self.assertEqual(response["Location"], "%s/federate/%s" % (
'http://testserver' if django.VERSION < (1, 9) else "",
provider.suffix
))
def test_login_bad_ticket(self):
"""
Try login with a bad ticket:
login should fail and the main login page should be displayed to the user
"""
provider = "example.com"
# get a bare client
client = Client()
session = client.session
session["federate_username"] = models.FederatedIendityProvider.build_username_from_suffix(
settings.CAS_TEST_USER,
provider
)
session["federate_ticket"] = utils.gen_st()
if django.VERSION >= (1, 8):
session.save()
response = client.get("/login")
# we should get a page with a from with all widget hidden that auto POST to /login using
# javascript. If javascript is disabled, a "connect" button is showed
self.assertTrue(response.context['auto_submit'])
self.assertEqual(response.context['post_url'], '/login')
params = tests_utils.copy_form(response.context["form"])
# POST, as (username, ticket) are not valid, we should get the federate login page
response = client.post("/login", params)
self.assertEqual(response.status_code, 200)
for provider in models.FederatedIendityProvider.objects.all():
self.assertIn(
'<option value="%s">%s</option>' % (
provider.suffix,
provider.verbose_name
),
response.content.decode("utf-8")
)
self.assertEqual(response.context['post_url'], '/federate')

View File

@ -1,4 +1,4 @@
# *- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
@ -12,20 +12,87 @@
"""Tests module for models"""
from cas_server.default_settings import settings
from django.test import TestCase
import django
from django.test import TestCase, Client
from django.test.utils import override_settings
from django.utils import timezone
from datetime import timedelta
from importlib import import_module
from cas_server import models
from cas_server import models, utils
from cas_server.tests.utils import get_auth_client, HttpParamsHandler
from cas_server.tests.mixin import UserModels, BaseServicePattern
from cas_server.tests.mixin import UserModels, BaseServicePattern, FederatedIendityProviderModel
from cas_server.tests.test_federate import PROVIDERS
SessionStore = import_module(settings.SESSION_ENGINE).SessionStore
class FederatedUserTestCase(TestCase, UserModels, FederatedIendityProviderModel):
"""test for the federated user model"""
def setUp(self):
"""Prepare the test context"""
self.setup_federated_identity_provider(PROVIDERS)
def test_clean_old_entries(self):
"""tests for clean_old_entries that should delete federated user no longer used"""
client = Client()
client.get("/login")
provider = models.FederatedIendityProvider.objects.get(suffix="example.com")
models.FederatedUser.objects.create(
username="test1", provider=provider, attributs={}, ticket=""
)
models.FederatedUser.objects.create(
username="test2", provider=provider, attributs={}, ticket=""
)
models.FederatedUser.objects.all().update(
last_update=(timezone.now() - timedelta(seconds=settings.CAS_TICKET_TIMEOUT + 10))
)
models.FederatedUser.objects.create(
username="test3", provider=provider, attributs={}, ticket=""
)
models.User.objects.create(
username="test1@example.com", session_key=client.session.session_key
)
self.assertEqual(len(models.FederatedUser.objects.all()), 3)
models.FederatedUser.clean_old_entries()
self.assertEqual(len(models.FederatedUser.objects.all()), 2)
with self.assertRaises(models.FederatedUser.DoesNotExist):
models.FederatedUser.objects.get(username="test2")
class FederateSLOTestCase(TestCase, UserModels):
"""test for the federated SLO model"""
def test_clean_deleted_sessions(self):
"""
tests for clean_deleted_sessions that should delete object for which matching session
do not exists anymore
"""
if django.VERSION >= (1, 8):
client1 = Client()
client2 = Client()
client1.get("/login")
client2.get("/login")
session = client2.session
session['authenticated'] = True
session.save()
models.FederateSLO.objects.create(
username="test1@example.com",
session_key=client1.session.session_key,
ticket=utils.gen_st()
)
models.FederateSLO.objects.create(
username="test2@example.com",
session_key=client2.session.session_key,
ticket=utils.gen_st()
)
self.assertEqual(len(models.FederateSLO.objects.all()), 2)
models.FederateSLO.clean_deleted_sessions()
self.assertEqual(len(models.FederateSLO.objects.all()), 1)
with self.assertRaises(models.FederateSLO.DoesNotExist):
models.FederateSLO.objects.get(username="test1@example.com")
@override_settings(CAS_AUTH_CLASS='cas_server.auth.TestAuthUser')
class UserTestCase(TestCase, UserModels):
"""tests for the user models"""

View File

@ -1,4 +1,4 @@
# *- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
@ -10,7 +10,7 @@
#
# (c) 2016 Valentin Samir
"""Tests module for utils"""
from django.test import TestCase
from django.test import TestCase, RequestFactory
import six
@ -189,3 +189,22 @@ class UtilsTestCase(TestCase):
self.assertFalse(utils.crypt_salt_is_valid("$$")) # start with $ followed by $
self.assertFalse(utils.crypt_salt_is_valid("$toto")) # start with $ but no secondary $
self.assertFalse(utils.crypt_salt_is_valid("$toto$toto")) # algorithm toto not known
def test_get_current_url(self):
"""test the function get_current_url"""
factory = RequestFactory()
request = factory.get('/truc/muche?test=1')
self.assertEqual(utils.get_current_url(request), 'http://testserver/truc/muche?test=1')
self.assertEqual(
utils.get_current_url(request, ignore_params={'test'}),
'http://testserver/truc/muche'
)
def test_get_tuple(self):
"""test the function get_tuple"""
test_tuple = (1, 2, 3)
for index, value in enumerate(test_tuple):
self.assertEqual(utils.get_tuple(test_tuple, index), value)
self.assertEqual(utils.get_tuple(test_tuple, 3), None)
self.assertEqual(utils.get_tuple(test_tuple, 3, 'toto'), 'toto')
self.assertEqual(utils.get_tuple(None, 3), None)

View File

@ -1,4 +1,4 @@
# *- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
@ -36,57 +36,17 @@ from cas_server.tests.utils import (
HttpParamsHandler,
Http404Handler
)
from cas_server.tests.mixin import BaseServicePattern, XmlContent
from cas_server.tests.mixin import BaseServicePattern, XmlContent, CanLogin
@override_settings(CAS_AUTH_CLASS='cas_server.auth.TestAuthUser')
class LoginTestCase(TestCase, BaseServicePattern):
class LoginTestCase(TestCase, BaseServicePattern, CanLogin):
"""Tests for the login view"""
def setUp(self):
"""Prepare the test context:"""
# we prepare a bunch a service url and service patterns for tests
self.setup_service_patterns()
def assert_logged(self, client, response, warn=False, code=200):
"""Assertions testing that client is well authenticated"""
self.assertEqual(response.status_code, code)
# this message is displayed to the user upon successful authentication
self.assertTrue(
(
b"You have successfully logged into "
b"the Central Authentication Service"
) in response.content
)
# these session variables a set if usccessfully authenticated
self.assertTrue(client.session["username"] == settings.CAS_TEST_USER)
self.assertTrue(client.session["warn"] is warn)
self.assertTrue(client.session["authenticated"] is True)
# on successfull authentication, a corresponding user object is created
self.assertTrue(
models.User.objects.get(
username=settings.CAS_TEST_USER,
session_key=client.session.session_key
)
)
def assert_login_failed(self, client, response, code=200):
"""Assertions testing a failed login attempt"""
self.assertEqual(response.status_code, code)
# this message is displayed to the user upon successful authentication, so it should not
# appear
self.assertFalse(
(
b"You have successfully logged into "
b"the Central Authentication Service"
) in response.content
)
# if authentication has failed, these session variables should not be set
self.assertTrue(client.session.get("username") is None)
self.assertTrue(client.session.get("warn") is None)
self.assertTrue(client.session.get("authenticated") is None)
def test_login_view_post_goodpass_goodlt(self):
"""Test a successul login"""
# we get a client who fetch a frist time the login page and the login form default
@ -471,7 +431,7 @@ class LoginTestCase(TestCase, BaseServicePattern):
data = json.loads(response.content.decode("utf8"))
self.assertEqual(data["status"], "error")
self.assertEqual(data["detail"], "login required")
self.assertEqual(data["url"], "/login?")
self.assertEqual(data["url"], "/login")
@override_settings(CAS_ENABLE_AJAX_AUTH=True)
def test_ajax_logged_user_deleted(self):
@ -491,7 +451,7 @@ class LoginTestCase(TestCase, BaseServicePattern):
data = json.loads(response.content.decode("utf8"))
self.assertEqual(data["status"], "error")
self.assertEqual(data["detail"], "login required")
self.assertEqual(data["url"], "/login?")
self.assertEqual(data["url"], "/login")
@override_settings(CAS_ENABLE_AJAX_AUTH=True)
def test_ajax_logged(self):

View File

@ -1,4 +1,4 @@
# *- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
@ -13,14 +13,38 @@
from cas_server.default_settings import settings
from django.test import Client
from django.template import loader, Context
from django.utils import timezone
import cgi
import six
from threading import Thread
from lxml import etree
from six.moves import BaseHTTPServer
from six.moves.urllib.parse import urlparse, parse_qsl
from datetime import timedelta
from cas_server import models
from cas_server import utils
def return_unicode(string, charset):
"""make `string` a unicode if `string` is a unicode or bytes encoded with `charset`"""
if not isinstance(string, six.text_type):
return string.decode(charset)
else:
return string
def return_bytes(string, charset):
"""
make `string` a bytes encoded with `charset` if `string` is a unicode
or bytes encoded with `charset`
"""
if isinstance(string, six.text_type):
return string.encode(charset)
else:
return string
def copy_form(form):
@ -149,10 +173,10 @@ class HttpParamsHandler(BaseHTTPServer.BaseHTTPRequestHandler):
return
@classmethod
def run(cls):
def run(cls, port=0):
"""Run a BaseHTTPServer using this class as handler"""
server_class = BaseHTTPServer.HTTPServer
httpd = server_class(("127.0.0.1", 0), cls)
httpd = server_class(("127.0.0.1", port), cls)
(host, port) = httpd.socket.getsockname()
def lauch():
@ -178,3 +202,139 @@ class Http404Handler(HttpParamsHandler):
def do_POST(self):
"""Called on a POST request on the BaseHTTPServer"""
return self.do_GET()
class DummyCAS(BaseHTTPServer.BaseHTTPRequestHandler):
"""A dummy CAS that validate for only one (service, ticket) used in federated mode tests"""
def test_params(self):
"""check that internal and provided (service, ticket) matches"""
if (
self.server.ticket is not None and
self.params.get("service").encode("ascii") == self.server.service and
self.params.get("ticket").encode("ascii") == self.server.ticket
):
self.server.ticket = None
return True
else:
return False
def send_headers(self, code, content_type):
"""send http headers"""
self.send_response(code)
self.send_header("Content-type", content_type)
self.end_headers()
def do_GET(self):
"""Called on a GET request on the BaseHTTPServer"""
url = urlparse(self.path)
self.params = dict(parse_qsl(url.query))
if url.path == "/validate":
self.send_headers(200, "text/plain; charset=utf-8")
if self.test_params():
self.wfile.write(b"yes\n" + self.server.username + b"\n")
self.server.ticket = None
else:
self.wfile.write(b"no\n")
elif url.path in {
'/serviceValidate', '/serviceValidate',
'/p3/serviceValidate', '/p3/proxyValidate'
}:
self.send_headers(200, "text/xml; charset=utf-8")
if self.test_params():
template = loader.get_template('cas_server/serviceValidate.xml')
context = Context({
'username': self.server.username,
'attributes': self.server.attributes
})
self.wfile.write(return_bytes(template.render(context), "utf8"))
else:
template = loader.get_template('cas_server/serviceValidateError.xml')
context = Context({
'code': 'BAD_SERVICE_TICKET',
'msg': 'Valids are (%r, %r)' % (self.server.service, self.server.ticket)
})
self.wfile.write(return_bytes(template.render(context), "utf8"))
else:
self.return_404()
def do_POST(self):
"""Called on a POST request on the BaseHTTPServer"""
url = urlparse(self.path)
self.params = dict(parse_qsl(url.query))
if url.path == "/samlValidate":
self.send_headers(200, "text/xml; charset=utf-8")
length = int(self.headers.get('content-length'))
root = etree.fromstring(self.rfile.read(length))
auth_req = root.getchildren()[1].getchildren()[0]
ticket = auth_req.getchildren()[0].text.encode("ascii")
if (
self.server.ticket is not None and
self.params.get("TARGET").encode("ascii") == self.server.service and
ticket == self.server.ticket
):
self.server.ticket = None
template = loader.get_template('cas_server/samlValidate.xml')
context = Context({
'IssueInstant': timezone.now().isoformat(),
'expireInstant': (timezone.now() + timedelta(seconds=60)).isoformat(),
'Recipient': self.server.service,
'ResponseID': utils.gen_saml_id(),
'username': self.server.username,
'attributes': self.server.attributes,
})
self.wfile.write(return_bytes(template.render(context), "utf8"))
else:
template = loader.get_template('cas_server/samlValidateError.xml')
context = Context({
'IssueInstant': timezone.now().isoformat(),
'ResponseID': utils.gen_saml_id(),
'code': 'BAD_SERVICE_TICKET',
'msg': 'Valids are (%r, %r)' % (self.server.service, self.server.ticket)
})
self.wfile.write(return_bytes(template.render(context), "utf8"))
else:
self.return_404()
def return_404(self):
"""return a 404 error"""
self.send_headers(404, "text/plain; charset=utf-8")
self.wfile.write("not found")
def log_message(self, *args):
"""silent any log message"""
return
@classmethod
def run(cls, service, ticket, username, attributes, port=0):
"""Run a BaseHTTPServer using this class as handler"""
server_class = BaseHTTPServer.HTTPServer
httpd = server_class(("127.0.0.1", port), cls)
httpd.service = service
httpd.ticket = ticket
httpd.username = username
httpd.attributes = attributes
(host, port) = httpd.socket.getsockname()
def lauch():
"""routine to lauch in a background thread"""
httpd.handle_request()
httpd.server_close()
httpd_thread = Thread(target=lauch)
httpd_thread.daemon = True
httpd_thread.start()
return (httpd, host, port)
def logout_request(ticket):
"""build a SLO request XML, ready to be send"""
return u"""<samlp:LogoutRequest xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol"
ID="%(id)s" Version="2.0" IssueInstant="%(datetime)s">
<saml:NameID xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion"></saml:NameID>
<samlp:SessionIndex>%(ticket)s</samlp:SessionIndex>
</samlp:LogoutRequest>""" % \
{
'id': utils.gen_saml_id(),
'datetime': timezone.now().isoformat(),
'ticket': ticket
}

View File

@ -1,4 +1,4 @@
# *- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
@ -59,4 +59,5 @@ urlpatterns = patterns(
),
name='auth'
),
url("^federate(?:/(?P<provider>([^/]+)))?$", views.FederateAuth.as_view(), name='federateAuth'),
)

View File

@ -25,6 +25,7 @@ import base64
import six
from importlib import import_module
from datetime import datetime, timedelta
from six.moves.urllib.parse import urlparse, urlunparse, parse_qsl, urlencode
@ -68,7 +69,50 @@ def reverse_params(url_name, params=None, **kwargs):
"""compule the reverse url or `url_name` and add GET parameters from `params` to it"""
url = reverse(url_name, **kwargs)
params = urlencode(params if params else {})
return url + "?%s" % params
if params:
return url + "?%s" % params
else:
return url
def copy_params(get_or_post_params, ignore=None):
"""copy from a dictionnary like `get_or_post_params` ignoring keys in the set `ignore`"""
if ignore is None:
ignore = set()
params = {}
for key in get_or_post_params:
if key not in ignore and get_or_post_params[key]:
params[key] = get_or_post_params[key]
return params
def set_cookie(response, key, value, max_age):
"""Set the cookie `key` on `response` with value `value` valid for `max_age` secondes"""
expires = datetime.strftime(
datetime.utcnow() + timedelta(seconds=max_age),
"%a, %d-%b-%Y %H:%M:%S GMT"
)
response.set_cookie(
key,
value,
max_age=max_age,
expires=expires,
domain=settings.SESSION_COOKIE_DOMAIN,
secure=settings.SESSION_COOKIE_SECURE or None
)
def get_current_url(request, ignore_params=None):
"""Giving a django request, return the current http url, possibly ignoring some GET params"""
if ignore_params is None:
ignore_params = set()
protocol = 'https' if request.is_secure() else "http"
service_url = "%s://%s%s" % (protocol, request.get_host(), request.path)
if request.GET:
params = copy_params(request.GET, ignore_params)
if params:
service_url += "?%s" % urlencode(params)
return service_url
def update_url(url, params):
@ -152,6 +196,19 @@ def gen_saml_id():
return _gen_ticket('_')
def get_tuple(nuplet, index, default=None):
"""
return the value in index `index` of the tuple `nuplet` if it exists,
else return `default`
"""
if nuplet is None:
return default
try:
return nuplet[index]
except IndexError:
return default
def crypt_salt_is_valid(salt):
"""Return True is salt is valid has a crypt salt, False otherwise"""
if len(salt) < 2:

View File

@ -1,4 +1,4 @@
# *- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
@ -20,8 +20,9 @@ from django.utils.decorators import method_decorator
from django.utils.translation import ugettext as _
from django.utils import timezone
from django.views.decorators.csrf import csrf_exempt
from django.middleware.csrf import CsrfViewMiddleware
from django.views.generic import View
from django.utils.encoding import python_2_unicode_compatible
import re
import logging
@ -37,7 +38,8 @@ import cas_server.models as models
from .utils import json_response
from .models import ServiceTicket, ProxyTicket, ProxyGrantingTicket
from .models import ServicePattern
from .models import ServicePattern, FederatedIendityProvider, FederatedUser
from .federate import CASFederateValidateUser
SessionStore = import_module(settings.SESSION_ENGINE).SessionStore
@ -78,6 +80,11 @@ class LogoutMixin(object):
username=username,
session_key=self.request.session.session_key
)
if settings.CAS_FEDERATE:
models.FederateSLO.objects.filter(
username=username,
session_key=self.request.session.session_key
).delete()
self.request.session.flush()
user.logout(self.request)
user.delete()
@ -115,7 +122,22 @@ class LogoutView(View, LogoutMixin):
"""methode called on GET request on this view"""
logger.info("logout requested")
self.init_get(request)
# if CAS federation mode is enable, bakup the provider before flushing the sessions
if settings.CAS_FEDERATE:
try:
user = FederatedUser.get_from_federated_username(
self.request.session.get("username")
)
auth = CASFederateValidateUser(user.provider, service_url="")
except FederatedUser.DoesNotExist:
auth = None
session_nb = self.logout(self.request.GET.get("all"))
# if CAS federation mode is enable, redirect to user CAS logout page
if settings.CAS_FEDERATE:
if auth is not None:
params = utils.copy_params(request.GET)
url = auth.get_logout_url()
return HttpResponseRedirect(utils.update_url(url, params))
# if service is set, redirect to service after logout
if self.service:
list(messages.get_messages(request)) # clean messages before leaving the django app
@ -170,6 +192,105 @@ class LogoutView(View, LogoutMixin):
)
class FederateAuth(View):
"""view to authenticated user agains a backend CAS then CAS_FEDERATE is True"""
@method_decorator(csrf_exempt)
def dispatch(self, request, *args, **kwargs):
"""dispatch different http request to the methods of the same name"""
return super(FederateAuth, self).dispatch(request, *args, **kwargs)
@staticmethod
def get_cas_client(request, provider):
"""return a CAS client object matching provider"""
service_url = utils.get_current_url(request, {"ticket", "provider"})
return CASFederateValidateUser(provider, service_url)
def post(self, request, provider=None):
"""method called on POST request"""
if not settings.CAS_FEDERATE:
logger.warning("CAS_FEDERATE is False, set it to True to use the federated mode")
return redirect("cas_server:login")
# POST with a provider, this is probably an SLO request
try:
provider = FederatedIendityProvider.objects.get(suffix=provider)
auth = self.get_cas_client(request, provider)
try:
auth.clean_sessions(request.POST['logoutRequest'])
except (KeyError, AttributeError):
pass
return HttpResponse("ok")
# else, a User is trying to log in using an identity provider
except FederatedIendityProvider.DoesNotExist:
# Manually checking for csrf to protect the code below
reason = CsrfViewMiddleware().process_view(request, None, (), {})
if reason is not None: # pragma: no cover (csrf checks are disabled during tests)
return reason # Failed the test, stop here.
form = forms.FederateSelect(request.POST)
if form.is_valid():
params = utils.copy_params(
request.POST,
ignore={"provider", "csrfmiddlewaretoken", "ticket"}
)
url = utils.reverse_params(
"cas_server:federateAuth",
kwargs=dict(provider=form.cleaned_data["provider"].suffix),
params=params
)
response = HttpResponseRedirect(url)
if form.cleaned_data["remember"]:
max_age = settings.CAS_FEDERATE_REMEMBER_TIMEOUT
utils.set_cookie(
response,
"_remember_provider",
form.cleaned_data["provider"].suffix,
max_age
)
return response
else:
return redirect("cas_server:login")
def get(self, request, provider=None):
"""method called on GET request"""
if not settings.CAS_FEDERATE:
logger.warning("CAS_FEDERATE is False, set it to True to use the federated mode")
return redirect("cas_server:login")
if self.request.session.get("authenticated"):
logger.warning("User already authenticated, dropping federate authentication request")
return redirect("cas_server:login")
try:
provider = FederatedIendityProvider.objects.get(suffix=provider)
auth = self.get_cas_client(request, provider)
if 'ticket' not in request.GET:
logger.info("Trying to authenticate again %s" % auth.provider.server_url)
return HttpResponseRedirect(auth.get_login_url())
else:
ticket = request.GET['ticket']
if auth.verify_ticket(ticket):
logger.info(
"Got a valid ticket for %s from %s" % (
auth.username,
auth.provider.server_url
)
)
params = utils.copy_params(request.GET, ignore={"ticket"})
request.session["federate_username"] = auth.federated_username
request.session["federate_ticket"] = ticket
auth.register_slo(auth.federated_username, request.session.session_key, ticket)
url = utils.reverse_params("cas_server:login", params)
return HttpResponseRedirect(url)
else:
logger.info(
"Got a invalid ticket for %s from %s. Retrying to authenticate" % (
auth.username,
auth.provider.server_url
)
)
return HttpResponseRedirect(auth.get_login_url())
except FederatedIendityProvider.DoesNotExist:
logger.warning("Identity provider suffix %s not found" % provider)
return redirect("cas_server:login")
class LoginView(View, LogoutMixin):
"""credential requestor / acceptor"""
@ -189,6 +310,10 @@ class LoginView(View, LogoutMixin):
renewed = False
warned = False
# used if CAS_FEDERATE is True
username = None
ticket = None
INVALID_LOGIN_TICKET = 1
USER_LOGIN_OK = 2
USER_LOGIN_FAILURE = 3
@ -207,6 +332,9 @@ class LoginView(View, LogoutMixin):
if request.POST.get('warned') and request.POST['warned'] != "False":
self.warned = True
self.warn = request.POST.get('warn')
if settings.CAS_FEDERATE:
self.username = request.POST.get('username')
self.ticket = request.POST.get('ticket')
def gen_lt(self):
"""Generate a new LoginTicket and add it to the list of valid LT for the user"""
@ -240,19 +368,16 @@ class LoginView(View, LogoutMixin):
_(u"Invalid login ticket")
)
elif ret == self.USER_LOGIN_OK:
try:
self.user = models.User.objects.get(
username=self.request.session['username'],
session_key=self.request.session.session_key
)
self.user.save() # pragma: no cover (should not happend)
except models.User.DoesNotExist:
self.user = models.User.objects.create(
username=self.request.session['username'],
session_key=self.request.session.session_key
)
self.user.save()
self.user = models.User.objects.get_or_create(
username=self.request.session['username'],
session_key=self.request.session.session_key
)[0]
self.user.save()
elif ret == self.USER_LOGIN_FAILURE: # bad user login
if settings.CAS_FEDERATE:
self.ticket = None
self.username = None
self.init_form()
self.logout()
elif ret == self.USER_ALREADY_LOGGED:
pass
@ -300,6 +425,13 @@ class LoginView(View, LogoutMixin):
self.method = request.GET.get('method')
self.ajax = settings.CAS_ENABLE_AJAX_AUTH and 'HTTP_X_AJAX' in request.META
self.warn = request.GET.get('warn')
if settings.CAS_FEDERATE:
self.username = request.session.get("federate_username")
self.ticket = request.session.get("federate_ticket")
if self.username:
del request.session["federate_username"]
if self.ticket:
del request.session["federate_ticket"]
def get(self, request, *args, **kwargs):
"""methode called on GET request on this view"""
@ -318,16 +450,29 @@ class LoginView(View, LogoutMixin):
def init_form(self, values=None):
"""Initialization of the good form depending of POST and GET parameters"""
self.form = forms.UserCredential(
values,
initial={
'service': self.service,
'method': self.method,
'warn': self.request.session.get("warn"),
'lt': self.request.session['lt'][-1],
'renew': self.renew
}
)
form_initial = {
'service': self.service,
'method': self.method,
'warn': self.warn or self.request.session.get("warn"),
'lt': self.request.session['lt'][-1],
'renew': self.renew
}
if settings.CAS_FEDERATE:
if self.username and self.ticket:
form_initial['username'] = self.username
form_initial['password'] = self.ticket
form_initial['ticket'] = self.ticket
self.form = forms.FederateUserCredential(
values,
initial=form_initial
)
else:
self.form = forms.FederateSelect(values, initial=form_initial)
else:
self.form = forms.UserCredential(
values,
initial=form_initial
)
def service_login(self):
"""Perform login agains a service"""
@ -494,11 +639,46 @@ class LoginView(View, LogoutMixin):
}
return json_response(self.request, data)
else:
return render(
self.request,
settings.CAS_LOGIN_TEMPLATE,
utils.context({'form': self.form})
)
if settings.CAS_FEDERATE:
if self.username and self.ticket:
return render(
self.request,
settings.CAS_LOGIN_TEMPLATE,
utils.context({
'form': self.form,
'auto_submit': True,
'post_url': reverse("cas_server:login")
})
)
else:
if (
self.request.COOKIES.get('_remember_provider') and
FederatedIendityProvider.objects.filter(
suffix=self.request.COOKIES['_remember_provider']
)
):
params = utils.copy_params(self.request.GET)
url = utils.reverse_params(
"cas_server:federateAuth",
params=params,
kwargs=dict(provider=self.request.COOKIES['_remember_provider'])
)
return HttpResponseRedirect(url)
else:
return render(
self.request,
settings.CAS_LOGIN_TEMPLATE,
utils.context({
'form': self.form,
'post_url': reverse("cas_server:federateAuth")
})
)
else:
return render(
self.request,
settings.CAS_LOGIN_TEMPLATE,
utils.context({'form': self.form})
)
def common(self):
"""Part execute uppon GET and POST request"""
@ -525,11 +705,14 @@ class Auth(View):
secret = request.POST.get('secret')
if not settings.CAS_AUTH_SHARED_SECRET:
return HttpResponse("no\nplease set CAS_AUTH_SHARED_SECRET", content_type="text/plain")
return HttpResponse(
"no\nplease set CAS_AUTH_SHARED_SECRET",
content_type="text/plain; charset=utf-8"
)
if secret != settings.CAS_AUTH_SHARED_SECRET:
return HttpResponse("no\n", content_type="text/plain")
return HttpResponse(u"no\n", content_type="text/plain; charset=utf-8")
if not username or not password or not service:
return HttpResponse("no\n", content_type="text/plain")
return HttpResponse(u"no\n", content_type="text/plain; charset=utf-8")
form = forms.UserCredential(
request.POST,
initial={
@ -540,16 +723,10 @@ class Auth(View):
)
if form.is_valid():
try:
try:
user = models.User.objects.get(
username=form.cleaned_data['username'],
session_key=request.session.session_key
)
except models.User.DoesNotExist:
user = models.User.objects.create(
username=form.cleaned_data['username'],
session_key=request.session.session_key
)
user = models.User.objects.get_or_create(
username=form.cleaned_data['username'],
session_key=request.session.session_key
)[0]
user.save()
# is the service allowed
service_pattern = ServicePattern.validate(service)
@ -557,11 +734,11 @@ class Auth(View):
service_pattern.check_user(user)
if not request.session.get("authenticated"):
user.delete()
return HttpResponse("yes\n", content_type="text/plain")
return HttpResponse(u"yes\n", content_type="text/plain; charset=utf-8")
except (ServicePattern.DoesNotExist, models.ServicePatternException):
return HttpResponse("no\n", content_type="text/plain")
return HttpResponse(u"no\n", content_type="text/plain; charset=utf-8")
else:
return HttpResponse("no\n", content_type="text/plain")
return HttpResponse(u"no\n", content_type="text/plain; charset=utf-8")
class Validate(View):
@ -601,7 +778,10 @@ class Validate(View):
username = username[0]
else:
username = ticket.user.username
return HttpResponse("yes\n%s\n" % username, content_type="text/plain")
return HttpResponse(
u"yes\n%s\n" % username,
content_type="text/plain; charset=utf-8"
)
except ServiceTicket.DoesNotExist:
logger.warning(
(
@ -612,12 +792,13 @@ class Validate(View):
service
)
)
return HttpResponse("no\n", content_type="text/plain")
return HttpResponse(u"no\n", content_type="text/plain; charset=utf-8")
else:
logger.warning("Validate: service or ticket missing")
return HttpResponse("no\n", content_type="text/plain")
return HttpResponse(u"no\n", content_type="text/plain; charset=utf-8")
@python_2_unicode_compatible
class ValidateError(Exception):
"""handle service validation error"""
def __init__(self, code, msg=""):
@ -625,7 +806,7 @@ class ValidateError(Exception):
self.msg = msg
super(ValidateError, self).__init__(code)
def __unicode__(self):
def __str__(self):
return u"%s" % self.msg
def render(self, request):
@ -658,8 +839,8 @@ class ValidateService(View, AttributesMixin):
if not self.service or not self.ticket:
logger.warning("ValidateService: missing ticket or service")
return ValidateError(
'INVALID_REQUEST',
"you must specify a service and a ticket"
u'INVALID_REQUEST',
u"you must specify a service and a ticket"
).render(request)
else:
try:
@ -729,14 +910,14 @@ class ValidateService(View, AttributesMixin):
for prox in ticket.proxies.all():
proxies.append(prox.url)
else:
raise ValidateError('INVALID_TICKET', self.ticket)
raise ValidateError(u'INVALID_TICKET', self.ticket)
ticket.validate = True
ticket.save()
if ticket.service != self.service:
raise ValidateError('INVALID_SERVICE', self.service)
raise ValidateError(u'INVALID_SERVICE', self.service)
return ticket, proxies
except (ServiceTicket.DoesNotExist, ProxyTicket.DoesNotExist):
raise ValidateError('INVALID_TICKET', 'ticket not found')
raise ValidateError(u'INVALID_TICKET', 'ticket not found')
def process_pgturl(self, params):
"""Handle PGT request"""
@ -782,18 +963,18 @@ class ValidateService(View, AttributesMixin):
except requests.exceptions.RequestException as error:
error = utils.unpack_nested_exception(error)
raise ValidateError(
'INVALID_PROXY_CALLBACK',
"%s: %s" % (type(error), str(error))
u'INVALID_PROXY_CALLBACK',
u"%s: %s" % (type(error), str(error))
)
else:
raise ValidateError(
'INVALID_PROXY_CALLBACK',
"callback url not allowed by configuration"
u'INVALID_PROXY_CALLBACK',
u"callback url not allowed by configuration"
)
except ServicePattern.DoesNotExist:
raise ValidateError(
'INVALID_PROXY_CALLBACK',
'callback url not allowed by configuration'
u'INVALID_PROXY_CALLBACK',
u'callback url not allowed by configuration'
)
@ -814,8 +995,8 @@ class Proxy(View):
return self.process_proxy()
else:
raise ValidateError(
'INVALID_REQUEST',
"you must specify and pgt and targetService"
u'INVALID_REQUEST',
u"you must specify and pgt and targetService"
)
except ValidateError as error:
logger.warning("Proxy: validation error: %s %s" % (error.code, error.msg))
@ -828,8 +1009,8 @@ class Proxy(View):
pattern = ServicePattern.validate(self.target_service)
if not pattern.proxy:
raise ValidateError(
'UNAUTHORIZED_SERVICE',
'the service %s do not allow proxy ticket' % self.target_service
u'UNAUTHORIZED_SERVICE',
u'the service %s do not allow proxy ticket' % self.target_service
)
# is the proxy granting ticket valid
ticket = ProxyGrantingTicket.objects.get(
@ -858,16 +1039,17 @@ class Proxy(View):
content_type="text/xml; charset=utf-8"
)
except ProxyGrantingTicket.DoesNotExist:
raise ValidateError('INVALID_TICKET', 'PGT %s not found' % self.pgt)
raise ValidateError(u'INVALID_TICKET', u'PGT %s not found' % self.pgt)
except ServicePattern.DoesNotExist:
raise ValidateError('UNAUTHORIZED_SERVICE', self.target_service)
raise ValidateError(u'UNAUTHORIZED_SERVICE', self.target_service)
except (models.BadUsername, models.BadFilter, models.UserFieldNotDefined):
raise ValidateError(
'UNAUTHORIZED_USER',
'User %s not allowed on %s' % (ticket.user.username, self.target_service)
u'UNAUTHORIZED_USER',
u'User %s not allowed on %s' % (ticket.user.username, self.target_service)
)
@python_2_unicode_compatible
class SamlValidateError(Exception):
"""handle saml validation error"""
def __init__(self, code, msg=""):
@ -875,7 +1057,7 @@ class SamlValidateError(Exception):
self.msg = msg
super(SamlValidateError, self).__init__(code)
def __unicode__(self):
def __str__(self):
return u"%s" % self.msg
def render(self, request):
@ -972,18 +1154,18 @@ class SamlValidate(View, AttributesMixin):
)
else:
raise SamlValidateError(
'AuthnFailed',
'ticket %s should begin with PT- or ST-' % ticket
u'AuthnFailed',
u'ticket %s should begin with PT- or ST-' % ticket
)
ticket.validate = True
ticket.save()
if ticket.service != self.target:
raise SamlValidateError(
'AuthnFailed',
'TARGET %s do not match ticket service' % self.target
u'AuthnFailed',
u'TARGET %s do not match ticket service' % self.target
)
return ticket
except (IndexError, KeyError):
raise SamlValidateError('VersionMismatch')
raise SamlValidateError(u'VersionMismatch')
except (ServiceTicket.DoesNotExist, ProxyTicket.DoesNotExist):
raise SamlValidateError('AuthnFailed', 'ticket %s not found' % ticket)
raise SamlValidateError(u'AuthnFailed', u'ticket %s not found' % ticket)