1
0
mirror of https://github.com/haiwen/seahub.git synced 2025-04-27 11:01:14 +00:00

improve shibboleth login (#5534)

* improve shibboleth login

* fix test
This commit is contained in:
WJH 2023-07-07 17:33:40 +08:00 committed by GitHub
parent c777a4bc6d
commit f7495902db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 77 additions and 42 deletions

View File

@ -206,17 +206,19 @@ class UserManager(object):
return self.get(email=email)
def create_shib_user(self, email, password=None, is_staff=False, is_active=False):
def create_shib_user(self, is_staff=False, is_active=False):
"""
Creates and saves a SHIB user with given username.
"""
user = User(email=email)
virtual_id = gen_user_virtual_id()
user = User(email=virtual_id)
user.is_staff = is_staff
user.is_active = is_active
user.set_password(password)
user.set_unusable_password()
user.save()
return self.get(email=email)
return self.get(email=virtual_id)
def create_superuser(self, email, password):
u = self.create_user(email, password, is_staff=True, is_active=True)

View File

@ -5,6 +5,7 @@ from shibboleth import backends
from seahub.base.accounts import User
from seahub.auth import authenticate
from seahub.auth.models import SocialAuthUser
from seahub.test_utils import BaseTestCase
import importlib
@ -48,6 +49,8 @@ settings.MIDDLEWARE.append(
'shibboleth.middleware.ShibbolethRemoteUserMiddleware',
)
SHIBBOLETH_PROVIDER_IDENTIFIER = getattr(settings, 'SHIBBOLETH_PROVIDER_IDENTIFIER', 'shibboleth')
class ShibbolethRemoteUserBackendTest(BaseTestCase):
def setUp(self):
@ -61,9 +64,10 @@ class ShibbolethRemoteUserBackendTest(BaseTestCase):
user = authenticate(remote_user=self.remote_user,
shib_meta=SAMPLE_HEADERS)
assert user.is_active is True
self.assertEqual(user.username, 'sampledeveloper@school.edu')
self.assertEqual(User.objects.get(self.remote_user).username,
'sampledeveloper@school.edu')
shib_user = SocialAuthUser.objects.get_by_provider_and_uid(SHIBBOLETH_PROVIDER_IDENTIFIER, self.remote_user)
self.assertEqual(shib_user.uid, 'sampledeveloper@school.edu')
self.assertEqual(User.objects.get(shib_user.username).username,
shib_user.username)
def test_notify_admins_on_activate_request(self):
self.assertEqual(len(mail.outbox), 0)
@ -75,7 +79,8 @@ class ShibbolethRemoteUserBackendTest(BaseTestCase):
importlib.reload(backends)
user = authenticate(remote_user=self.remote_user,
shib_meta=SAMPLE_HEADERS)
self.assertEqual(user.username, 'sampledeveloper@school.edu')
shib_user = SocialAuthUser.objects.get_by_provider_and_uid(SHIBBOLETH_PROVIDER_IDENTIFIER, self.remote_user)
self.assertEqual(shib_user.uid, 'sampledeveloper@school.edu')
assert user.is_active is False
assert len(mail.outbox) != 0

View File

@ -6,6 +6,7 @@ from django.conf import settings
from django.test import RequestFactory, override_settings
from seahub.base.accounts import User
from seahub.auth.models import SocialAuthUser
from seahub.profile.models import Profile
from seahub.test_utils import BaseTestCase
from shibboleth import backends
@ -22,6 +23,8 @@ settings.MIDDLEWARE.append(
'shibboleth.middleware.ShibbolethRemoteUserMiddleware',
)
SHIBBOLETH_PROVIDER_IDENTIFIER = getattr(settings, 'SHIBBOLETH_PROVIDER_IDENTIFIER', 'shibboleth')
class ShibbolethRemoteUserMiddlewareTest(BaseTestCase):
@ -66,11 +69,13 @@ class ShibbolethRemoteUserMiddlewareTest(BaseTestCase):
assert len(Profile.objects.all()) == 0
self.middleware.process_request(self.request)
assert self.request.user.username == 'sampledeveloper@school.edu'
shib_user = SocialAuthUser.objects.get_by_provider_and_uid(
SHIBBOLETH_PROVIDER_IDENTIFIER, 'sampledeveloper@school.edu')
assert self.request.user.username == shib_user.username
assert len(Profile.objects.all()) == 1
assert self.request.shib_login is True
assert Profile.objects.all()[0].user == 'sampledeveloper@school.edu'
assert Profile.objects.all()[0].user == shib_user.username
assert Profile.objects.all()[0].nickname == 'Sample Developer'
@override_settings(SHIBBOLETH_AFFILIATION_ROLE_MAP={
@ -91,11 +96,13 @@ class ShibbolethRemoteUserMiddlewareTest(BaseTestCase):
assert len(Profile.objects.all()) == 0
self.middleware.process_request(self.request)
assert self.request.user.username == 'sampledeveloper@school.edu'
shib_user = SocialAuthUser.objects.get_by_provider_and_uid(
SHIBBOLETH_PROVIDER_IDENTIFIER, 'sampledeveloper@school.edu')
assert self.request.user.username == shib_user.username
assert len(Profile.objects.all()) == 1
assert self.request.shib_login is True
assert Profile.objects.all()[0].user == 'sampledeveloper@school.edu'
assert Profile.objects.all()[0].user == shib_user.username
assert Profile.objects.all()[0].nickname == 'Sample Developer'
assert User.objects.get(self.request.user.username).role == 'staff'

View File

@ -1,12 +1,19 @@
import logging
from django.conf import settings
from seaserv import ccnet_api
from seahub.auth.backends import RemoteUserBackend
from seahub.auth.models import SocialAuthUser
from seahub.base.accounts import User
from registration.models import (
notify_admins_on_activate_request, notify_admins_on_register_complete)
logger = logging.getLogger(__name__)
SHIBBOLETH_PROVIDER_IDENTIFIER = getattr(settings, 'SHIBBOLETH_PROVIDER_IDENTIFIER', 'shibboleth')
class ShibbolethRemoteUserBackend(RemoteUserBackend):
"""
This backend is to be used in conjunction with the ``RemoteUserMiddleware``
@ -43,29 +50,39 @@ class ShibbolethRemoteUserBackend(RemoteUserBackend):
if not remote_user:
return
username = self.clean_username(remote_user)
local_ccnet_users = ccnet_api.search_emailusers('DB', username, -1, -1)
if not local_ccnet_users:
local_ccnet_users = ccnet_api.search_emailusers('LDAP', username, -1, -1)
if username.lower() not in [item.email for item in local_ccnet_users]:
local_ccnet_users = []
if not local_ccnet_users:
if self.create_unknown_user:
user = User.objects.create_shib_user(
email=username, is_active=self.activate_after_creation)
if user and self.activate_after_creation is False:
notify_admins_on_activate_request(user.email)
# Do not send follwing registration finished email (if any)
# which will cause confusion.
return user
if user and settings.NOTIFY_ADMIN_AFTER_REGISTRATION is True:
notify_admins_on_register_complete(user.email)
else:
remote_user = self.clean_username(remote_user)
shib_user = SocialAuthUser.objects.get_by_provider_and_uid(SHIBBOLETH_PROVIDER_IDENTIFIER, remote_user)
if shib_user:
try:
user = User.objects.get(email=shib_user.username)
except User.DoesNotExist:
user = None
if not user:
# Means found user in social_auth_usersocialauth but not found user in EmailUser,
# delete it and recreate one.
logger.warning('The DB data is invalid, delete it and recreate one.')
SocialAuthUser.objects.filter(provider=SHIBBOLETH_PROVIDER_IDENTIFIER, uid=remote_user).delete()
else:
user = User.objects.get(email=username)
# compatible with old users via SHIB_USER_HEADER
try:
user = User.objects.get_old_user(remote_user, SHIBBOLETH_PROVIDER_IDENTIFIER, remote_user)
except User.DoesNotExist:
user = None
if not user and self.create_unknown_user:
try:
user = User.objects.create_shib_user(is_active=self.activate_after_creation)
SocialAuthUser.objects.add(user.username, SHIBBOLETH_PROVIDER_IDENTIFIER, remote_user)
except Exception as e:
logger.error(f'create saml user failed. {e}')
return None
if user and self.activate_after_creation is False:
notify_admins_on_activate_request(user.email)
# Do not send follwing registration finished email (if any)
# which will cause confusion.
return user
if user and settings.NOTIFY_ADMIN_AFTER_REGISTRATION is True:
notify_admins_on_register_complete(user.email)
return user

View File

@ -15,6 +15,7 @@ from shibboleth.app_settings import SHIB_ATTRIBUTE_MAP, LOGOUT_SESSION_KEY, SHIB
from seahub import auth
from seahub.base.accounts import User
from seahub.auth.models import SocialAuthUser
from seahub.base.sudo_mode import update_sudo_mode_ts
from seahub.profile.models import Profile
from seahub.utils.file_size import get_quota_from_string
@ -34,6 +35,8 @@ try:
except KeyError:
CUSTOM_SHIBBOLETH_GET_USER_ROLE = False
SHIBBOLETH_PROVIDER_IDENTIFIER = getattr(settings, 'SHIBBOLETH_PROVIDER_IDENTIFIER', 'shibboleth')
class ShibbolethRemoteUserMiddleware(RemoteUserMiddleware):
"""
@ -64,22 +67,23 @@ class ShibbolethRemoteUserMiddleware(RemoteUserMiddleware):
# Locate the remote user header.
# import pprint; pprint.pprint(request.META)
try:
username = request.META[SHIB_USER_HEADER]
remote_user = request.META[SHIB_USER_HEADER]
except KeyError:
# If specified header doesn't exist then return (leaving
# request.user set to AnonymousUser by the
# AuthenticationMiddleware).
return
p_id = ccnet_api.get_primary_id(username)
if p_id is not None:
username = p_id
# If the user is already authenticated and that user is the user we are
# getting passed in the headers, then the correct user is already
# persisted in the session and we don't need to continue.
if request.user.is_authenticated:
if request.user.username == username:
# If user is already authenticated, the value of request.user.username should be random ID of user,
# not the SHIB_USER_HEADER in the request header
shib_user = SocialAuthUser.objects.get_by_provider_and_uid(SHIBBOLETH_PROVIDER_IDENTIFIER, remote_user)
if shib_user:
remote_user = shib_user.username
if request.user.username == remote_user:
if request.user.is_staff:
update_sudo_mode_ts(request)
return
@ -94,7 +98,7 @@ class ShibbolethRemoteUserMiddleware(RemoteUserMiddleware):
# We are seeing this user for the first time in this session, attempt
# to authenticate the user.
user = auth.authenticate(remote_user=username, shib_meta=shib_meta)
user = auth.authenticate(remote_user=remote_user, shib_meta=shib_meta)
if user:
if not user.is_active:
return HttpResponseRedirect(reverse('shib_complete'))