From 92445b16eff8e5575b7244d97fe5291fb9debcba Mon Sep 17 00:00:00 2001 From: lian Date: Wed, 12 Jun 2024 17:24:24 +0800 Subject: [PATCH] shib second uid (#6175) * second uid for shibboleth * update test --- seahub/auth/models.py | 32 ++++++++++--------- .../thirdpart/shibboleth/test_middleware.py | 15 +++++++-- thirdpart/shibboleth/app_settings.py | 25 ++++++++------- thirdpart/shibboleth/backends.py | 15 +++++++-- thirdpart/shibboleth/middleware.py | 22 ++++++------- 5 files changed, 65 insertions(+), 44 deletions(-) diff --git a/seahub/auth/models.py b/seahub/auth/models.py index abd5edca275..9ae0026a614 100644 --- a/seahub/auth/models.py +++ b/seahub/auth/models.py @@ -1,17 +1,12 @@ # Copyright (c) 2012-2016 Seafile Ltd. -import datetime import hashlib -import urllib.request, urllib.parse, urllib.error import logging - -# import auth -from django.core.exceptions import ImproperlyConfigured +from registration.signals import user_deleted from django.db import models -from django.db.models.manager import EmptyManager -from django.contrib.contenttypes.models import ContentType -from django.utils.encoding import smart_str -from django.utils.translation import gettext_lazy as _ from django.conf import settings +from django.dispatch import receiver +from django.utils.encoding import smart_str +from django.db.models.manager import EmptyManager logger = logging.getLogger(__name__) UNUSABLE_PASSWORD = '!' # This will never be a valid hash @@ -130,15 +125,26 @@ def is_authenticated(self): class SocialAuthUserManager(models.Manager): + def add(self, username, provider, uid, extra_data=''): try: - social_auth_user = self.model(username=username, provider=provider, uid=uid, extra_data=extra_data) + social_auth_user = self.model(username=username, provider=provider, + uid=uid, extra_data=extra_data) social_auth_user.save() return social_auth_user except Exception as e: logger.error(e) return None + def add_if_not_exists(self, username, provider, uid, extra_data=''): + + social_auth_user = self.get_by_provider_and_uid(provider, uid) + if not social_auth_user: + social_auth_user = self.add(username, provider, + uid, extra_data=extra_data) + + return social_auth_user + def get_by_provider_and_uid(self, provider, uid): try: social_auth_user = self.get(provider=provider, uid=uid) @@ -186,11 +192,7 @@ class Meta: db_table = 'external_department' -# # handle signals -from django.dispatch import receiver -from registration.signals import user_deleted - - +# handle signals @receiver(user_deleted) def user_deleted_cb(sender, **kwargs): username = kwargs['username'] diff --git a/tests/seahub/thirdpart/shibboleth/test_middleware.py b/tests/seahub/thirdpart/shibboleth/test_middleware.py index bb933beac33..279857674aa 100644 --- a/tests/seahub/thirdpart/shibboleth/test_middleware.py +++ b/tests/seahub/thirdpart/shibboleth/test_middleware.py @@ -48,7 +48,7 @@ def setUp(self): self.request.META = {} self.request.META['Shibboleth-eppn'] = 'sampledeveloper@school.edu' - self.request.META['REMOTE_USER'] = 'sampledeveloper@school.edu' + self.request.META['HTTP_REMOTE_USER'] = 'sampledeveloper@school.edu' self.request.META['givenname'] = 'test_gname' self.request.META['surname'] = 'test_sname' self.request.META['Shibboleth-displayName'] = 'Sample Developer' @@ -68,6 +68,12 @@ def setUp(self): def test_can_process(self): assert len(Profile.objects.all()) == 0 + # logout first + from seahub.auth.models import AnonymousUser + self.request.session.flush() + self.request.user = AnonymousUser() + + # then login user via thibboleth self.middleware.process_request(self.request) shib_user = SocialAuthUser.objects.get_by_provider_and_uid( SHIBBOLETH_PROVIDER_IDENTIFIER, 'sampledeveloper@school.edu') @@ -95,6 +101,12 @@ def test_can_process(self): def test_can_process_user_role(self): assert len(Profile.objects.all()) == 0 + # logout first + from seahub.auth.models import AnonymousUser + self.request.session.flush() + self.request.user = AnonymousUser() + + # then login user via thibboleth self.middleware.process_request(self.request) shib_user = SocialAuthUser.objects.get_by_provider_and_uid( SHIBBOLETH_PROVIDER_IDENTIFIER, 'sampledeveloper@school.edu') @@ -191,4 +203,3 @@ def test_get_role_by_affiliation(self): assert obj._get_role_by_affiliation('student1@school.edu') == 'student' assert obj._get_role_by_affiliation('a@x.edu') == 'aaa' assert obj._get_role_by_affiliation('a@x.com') == 'guest' - diff --git a/thirdpart/shibboleth/app_settings.py b/thirdpart/shibboleth/app_settings.py index 57bed1f8cac..ecb1fbbbbc8 100755 --- a/thirdpart/shibboleth/app_settings.py +++ b/thirdpart/shibboleth/app_settings.py @@ -2,30 +2,31 @@ from django.conf import settings from django.core.exceptions import ImproperlyConfigured -#At a minimum you will need username, +# At a minimum you will need username, default_shib_attributes = { "Shibboleth-eppn": (True, "username"), -} +} SHIB_ATTRIBUTE_MAP = getattr(settings, 'SHIBBOLETH_ATTRIBUTE_MAP', default_shib_attributes) -#Set to true if you are testing and want to insert sample headers. +# Set to true if you are testing and want to insert sample headers. SHIB_MOCK_HEADERS = getattr(settings, 'SHIBBOLETH_MOCK_HEADERS', False) -SHIB_USER_HEADER = getattr(settings, 'SHIBBOLETH_USER_HEADER', "REMOTE_USER") +SHIB_USER_HEADER = getattr(settings, 'SHIBBOLETH_USER_HEADER', "HTTP_REMOTE_USER") +SHIB_USER_HEADER_SECOND_UID = getattr(settings, + 'SHIBBOLETH_USER_HEADER_SECOND_UID', + 'HTTP_REMOTE_USER_SUBJECT_ID') LOGIN_URL = getattr(settings, 'LOGIN_URL', None) if not LOGIN_URL: raise ImproperlyConfigured("A LOGIN_URL is required. Specify in settings.py") -#Optional logout parameters -#This should look like: https://sso.school.edu/idp/logout.jsp?return=%s -#The return url variable will be replaced in the LogoutView. +# Optional logout parameters +# This should look like: https://sso.school.edu/idp/logout.jsp?return=%s +# The return url variable will be replaced in the LogoutView. LOGOUT_URL = getattr(settings, 'SHIBBOLETH_LOGOUT_URL', None) -#LOGOUT_REDIRECT_URL specifies a default logout page that will always be used when -#users logout from Shibboleth. +# LOGOUT_REDIRECT_URL specifies a default logout page that will always be used when +# users logout from Shibboleth. LOGOUT_REDIRECT_URL = getattr(settings, 'SHIBBOLETH_LOGOUT_REDIRECT_URL', None) -#Name of key. Probably no need to change this. +# Name of key. Probably no need to change this. LOGOUT_SESSION_KEY = getattr(settings, 'SHIBBOLETH_FORCE_REAUTH_SESSION_KEY', 'shib_force_reauth') - - diff --git a/thirdpart/shibboleth/backends.py b/thirdpart/shibboleth/backends.py index 70cd3d77921..da2c306cc42 100644 --- a/thirdpart/shibboleth/backends.py +++ b/thirdpart/shibboleth/backends.py @@ -38,7 +38,7 @@ def get_user(self, username): user = None return user - def authenticate(self, remote_user, shib_meta): + def authenticate(self, remote_user, shib_meta, second_uid=''): """ The username passed as ``remote_user`` is considered trusted. This method simply returns the ``User`` object with the given username, @@ -69,10 +69,21 @@ def authenticate(self, remote_user, shib_meta): except User.DoesNotExist: user = None + if user and second_uid: + SocialAuthUser.objects.add_if_not_exists(user.username, + SHIBBOLETH_PROVIDER_IDENTIFIER, + second_uid) + if not user and self.create_unknown_user: try: user = User.objects.create_shib_user(is_active=self.activate_after_creation) - SocialAuthUser.objects.add(user.username, SHIBBOLETH_PROVIDER_IDENTIFIER, remote_user) + SocialAuthUser.objects.add_if_not_exists(user.username, + SHIBBOLETH_PROVIDER_IDENTIFIER, + remote_user) + if second_uid: + SocialAuthUser.objects.add_if_not_exists(user.username, + SHIBBOLETH_PROVIDER_IDENTIFIER, + second_uid) except Exception as e: logger.error('create shib user failed: %s' % e) return None diff --git a/thirdpart/shibboleth/middleware.py b/thirdpart/shibboleth/middleware.py index 8f272990bff..ea1e2a7d492 100755 --- a/thirdpart/shibboleth/middleware.py +++ b/thirdpart/shibboleth/middleware.py @@ -11,7 +11,8 @@ from django.http import HttpResponseRedirect from seaserv import seafile_api, ccnet_api -from shibboleth.app_settings import SHIB_ATTRIBUTE_MAP, LOGOUT_SESSION_KEY, SHIB_USER_HEADER +from shibboleth.app_settings import SHIB_ATTRIBUTE_MAP, LOGOUT_SESSION_KEY, \ + SHIB_USER_HEADER, SHIB_USER_HEADER_SECOND_UID from seahub import auth from seahub.base.accounts import User @@ -40,8 +41,7 @@ class ShibbolethRemoteUserMiddleware(RemoteUserMiddleware): """ - Authentication Middleware for use with Shibboleth. Uses the recommended pattern - for remote authentication from: http://code.djangoproject.com/svn/django/tags/releases/1.3/django/contrib/auth/middleware.py + Authentication Middleware for use with Shibboleth. """ def process_request(self, request): if request.path.rstrip('/') != settings.SITE_ROOT + 'sso': @@ -74,19 +74,13 @@ def process_request(self, request): # AuthenticationMiddleware). return + second_uid = request.META.get(SHIB_USER_HEADER_SECOND_UID, '') + # If the user is already authenticated and that user is the user we are # getting passed in the headers, then the correct user is already # persisted in the session and we don't need to continue. if request.user.is_authenticated: - # If user is already authenticated, the value of request.user.username should be random ID of user, - # not the SHIB_USER_HEADER in the request header - shib_user = SocialAuthUser.objects.get_by_provider_and_uid(SHIBBOLETH_PROVIDER_IDENTIFIER, remote_user) - if shib_user: - remote_user = shib_user.username - if request.user.username == remote_user: - if request.user.is_staff: - update_sudo_mode_ts(request) - return + return # Make sure we have all required Shiboleth elements before proceeding. shib_meta, error = self.parse_attributes(request) @@ -98,7 +92,9 @@ def process_request(self, request): # We are seeing this user for the first time in this session, attempt # to authenticate the user. - user = auth.authenticate(remote_user=remote_user, shib_meta=shib_meta) + user = auth.authenticate(remote_user=remote_user, + shib_meta=shib_meta, + second_uid=second_uid) if user: if not user.is_active: return HttpResponseRedirect(reverse('shib_complete'))