django-cas-server/cas_server/federate.py

112 lines
4.0 KiB
Python
Raw Normal View History

2016-07-03 15:54:11 +00:00
# -*- coding: utf-8 -*-
2016-06-17 17:28:49 +00:00
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
# more details.
#
# You should have received a copy of the GNU General Public License version 3
# along with this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
2016-07-03 16:11:48 +00:00
# (c) 2016 Valentin Samir
2016-07-03 15:54:11 +00:00
"""federated mode helper classes"""
2016-06-17 17:28:49 +00:00
from .default_settings import settings
from django.db import IntegrityError
2016-06-17 17:28:49 +00:00
from .cas import CASClient
2016-06-23 15:18:53 +00:00
from .models import FederatedUser, FederateSLO, User
import logging
2016-06-23 15:18:53 +00:00
from importlib import import_module
from six.moves import urllib
2016-06-23 15:18:53 +00:00
SessionStore = import_module(settings.SESSION_ENGINE).SessionStore
2016-06-17 17:28:49 +00:00
logger = logging.getLogger(__name__)
2016-06-17 17:28:49 +00:00
class CASFederateValidateUser(object):
2016-07-03 15:54:11 +00:00
"""Class CAS client used to authenticate the user again a CAS provider"""
2016-06-17 17:28:49 +00:00
username = None
attributs = {}
client = None
def __init__(self, provider, service_url):
self.provider = provider
self.client = CASClient(
service_url=service_url,
version=provider.cas_protocol_version,
server_url=provider.server_url,
renew=False,
)
2016-06-17 17:28:49 +00:00
def get_login_url(self):
2016-07-03 15:54:11 +00:00
"""return the CAS provider login url"""
return self.client.get_login_url()
2016-06-17 17:28:49 +00:00
def get_logout_url(self, redirect_url=None):
2016-07-03 15:54:11 +00:00
"""return the CAS provider logout url"""
return self.client.get_logout_url(redirect_url)
2016-06-17 17:28:49 +00:00
def verify_ticket(self, ticket):
2016-07-03 15:54:11 +00:00
"""test `ticket` agains the CAS provider, if valid, create the local federated user"""
try:
username, attributs = self.client.verify_ticket(ticket)[:2]
except urllib.error.URLError:
2016-06-17 17:28:49 +00:00
return False
if username is not None:
if attributs is None:
attributs = {}
2016-06-17 17:28:49 +00:00
attributs["provider"] = self.provider
self.username = username
self.attributs = attributs
user = FederatedUser.objects.update_or_create(
username=username,
provider=self.provider,
defaults=dict(attributs=attributs, ticket=ticket)
)[0]
user.save()
self.federated_username = user.federated_username
2016-06-17 17:28:49 +00:00
return True
else:
return False
2016-06-23 15:18:53 +00:00
2016-06-27 22:48:48 +00:00
@staticmethod
def register_slo(username, session_key, ticket):
2016-07-03 15:54:11 +00:00
"""association a ticket with a (username, session) for processing later SLO request"""
try:
FederateSLO.objects.create(
username=username,
session_key=session_key,
ticket=ticket
)
except IntegrityError: # pragma: no cover (ignore if the FederateSLO already exists)
pass
2016-06-23 15:18:53 +00:00
def clean_sessions(self, logout_request):
2016-07-03 15:54:11 +00:00
"""process a SLO request"""
2016-06-23 15:18:53 +00:00
try:
slos = self.client.get_saml_slos(logout_request) or []
except NameError: # pragma: no cover (should not happen)
2016-06-27 22:48:48 +00:00
slos = []
for slo in slos:
for federate_slo in FederateSLO.objects.filter(ticket=slo.text):
logger.info(
"Got an SLO requests for ticket %s, logging out user %s" % (
federate_slo.username,
federate_slo.ticket
)
)
session = SessionStore(session_key=federate_slo.session_key)
session.flush()
try:
user = User.objects.get(
username=federate_slo.username,
session_key=federate_slo.session_key
)
user.logout()
user.delete()
except User.DoesNotExist: # pragma: no cover (should not happen)
pass
federate_slo.delete()