150 lines
5.5 KiB
Python
150 lines
5.5 KiB
Python
# -*- coding: utf-8 -*-
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
|
|
# more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License version 3
|
|
# along with this program; if not, write to the Free Software Foundation, Inc., 51
|
|
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
#
|
|
# (c) 2016 Valentin Samir
|
|
"""federated mode helper classes"""
|
|
from .default_settings import SessionStore
|
|
from django.db import IntegrityError
|
|
|
|
from .cas import CASClient
|
|
from .models import FederatedUser, FederateSLO, User
|
|
|
|
import logging
|
|
from six.moves import urllib
|
|
|
|
#: logger facility
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CASFederateValidateUser(object):
|
|
"""
|
|
Class CAS client used to authenticate the user again a CAS provider
|
|
|
|
:param cas_server.models.FederatedIendityProvider provider: The provider to use for
|
|
authenticate the user.
|
|
:param unicode service_url: The service url to transmit to the ``provider``.
|
|
"""
|
|
#: the provider returned username
|
|
username = None
|
|
#: the provider returned attributes
|
|
attributs = {}
|
|
#: the CAS client instance
|
|
client = None
|
|
|
|
def __init__(self, provider, service_url):
|
|
self.provider = provider
|
|
self.client = CASClient(
|
|
service_url=service_url,
|
|
version=provider.cas_protocol_version,
|
|
server_url=provider.server_url,
|
|
renew=False,
|
|
)
|
|
|
|
def get_login_url(self):
|
|
"""
|
|
:return: the CAS provider login url
|
|
:rtype: unicode
|
|
"""
|
|
return self.client.get_login_url()
|
|
|
|
def get_logout_url(self, redirect_url=None):
|
|
"""
|
|
:param redirect_url: The url to redirect to after logout from the provider, if provided.
|
|
:type redirect_url: :obj:`unicode` or :obj:`NoneType<types.NoneType>`
|
|
:return: the CAS provider logout url
|
|
:rtype: unicode
|
|
"""
|
|
return self.client.get_logout_url(redirect_url)
|
|
|
|
def verify_ticket(self, ticket):
|
|
"""
|
|
test ``ticket`` agains the CAS provider, if valid, create a
|
|
:class:`FederatedUser<cas_server.models.FederatedUser>` matching provider returned
|
|
username and attributes.
|
|
|
|
:param unicode ticket: The ticket to validate against the provider CAS
|
|
:return: ``True`` if the validation succeed, else ``False``.
|
|
:rtype: bool
|
|
"""
|
|
try:
|
|
username, attributs = self.client.verify_ticket(ticket)[:2]
|
|
except urllib.error.URLError:
|
|
return False
|
|
if username is not None:
|
|
if attributs is None:
|
|
attributs = {}
|
|
attributs["provider"] = self.provider
|
|
self.username = username
|
|
self.attributs = attributs
|
|
user = FederatedUser.objects.update_or_create(
|
|
username=username,
|
|
provider=self.provider,
|
|
defaults=dict(attributs=attributs, ticket=ticket)
|
|
)[0]
|
|
user.save()
|
|
self.federated_username = user.federated_username
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def register_slo(username, session_key, ticket):
|
|
"""
|
|
association a ``ticket`` with a (``username``, ``session_key``) for processing later SLO
|
|
request by creating a :class:`cas_server.models.FederateSLO` object.
|
|
|
|
:param unicode username: A logged user username, with the ``@`` component.
|
|
:param unicode session_key: A logged user session_key matching ``username``.
|
|
:param unicode ticket: A ticket used to authentication ``username`` for the session
|
|
``session_key``.
|
|
"""
|
|
try:
|
|
FederateSLO.objects.create(
|
|
username=username,
|
|
session_key=session_key,
|
|
ticket=ticket
|
|
)
|
|
except IntegrityError: # pragma: no cover (ignore if the FederateSLO already exists)
|
|
pass
|
|
|
|
def clean_sessions(self, logout_request):
|
|
"""
|
|
process a SLO request: Search for ticket values in ``logout_request``. For each
|
|
ticket value matching a :class:`cas_server.models.FederateSLO`, disconnect the
|
|
corresponding user.
|
|
|
|
:param unicode logout_request: An XML document contening one or more Single Log Out
|
|
requests.
|
|
"""
|
|
try:
|
|
slos = self.client.get_saml_slos(logout_request) or []
|
|
except NameError: # pragma: no cover (should not happen)
|
|
slos = []
|
|
for slo in slos:
|
|
for federate_slo in FederateSLO.objects.filter(ticket=slo.text):
|
|
logger.info(
|
|
"Got an SLO requests for ticket %s, logging out user %s" % (
|
|
federate_slo.username,
|
|
federate_slo.ticket
|
|
)
|
|
)
|
|
session = SessionStore(session_key=federate_slo.session_key)
|
|
session.flush()
|
|
try:
|
|
user = User.objects.get(
|
|
username=federate_slo.username,
|
|
session_key=federate_slo.session_key
|
|
)
|
|
user.logout()
|
|
user.delete()
|
|
except User.DoesNotExist: # pragma: no cover (should not happen)
|
|
pass
|
|
federate_slo.delete()
|