mirror of
https://github.com/haiwen/seahub.git
synced 2025-04-28 03:10:45 +00:00
parent
ac1c00442e
commit
92445b16ef
@ -1,17 +1,12 @@
|
||||
# Copyright (c) 2012-2016 Seafile Ltd.
|
||||
import datetime
|
||||
import hashlib
|
||||
import urllib.request, urllib.parse, urllib.error
|
||||
import logging
|
||||
|
||||
# import auth
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from registration.signals import user_deleted
|
||||
from django.db import models
|
||||
from django.db.models.manager import EmptyManager
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.utils.encoding import smart_str
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from django.conf import settings
|
||||
from django.dispatch import receiver
|
||||
from django.utils.encoding import smart_str
|
||||
from django.db.models.manager import EmptyManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
UNUSABLE_PASSWORD = '!' # This will never be a valid hash
|
||||
@ -130,15 +125,26 @@ class AnonymousUser(object):
|
||||
|
||||
|
||||
class SocialAuthUserManager(models.Manager):
|
||||
|
||||
def add(self, username, provider, uid, extra_data=''):
|
||||
try:
|
||||
social_auth_user = self.model(username=username, provider=provider, uid=uid, extra_data=extra_data)
|
||||
social_auth_user = self.model(username=username, provider=provider,
|
||||
uid=uid, extra_data=extra_data)
|
||||
social_auth_user.save()
|
||||
return social_auth_user
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
return None
|
||||
|
||||
def add_if_not_exists(self, username, provider, uid, extra_data=''):
|
||||
|
||||
social_auth_user = self.get_by_provider_and_uid(provider, uid)
|
||||
if not social_auth_user:
|
||||
social_auth_user = self.add(username, provider,
|
||||
uid, extra_data=extra_data)
|
||||
|
||||
return social_auth_user
|
||||
|
||||
def get_by_provider_and_uid(self, provider, uid):
|
||||
try:
|
||||
social_auth_user = self.get(provider=provider, uid=uid)
|
||||
@ -186,11 +192,7 @@ class ExternalDepartment(models.Model):
|
||||
db_table = 'external_department'
|
||||
|
||||
|
||||
# # handle signals
|
||||
from django.dispatch import receiver
|
||||
from registration.signals import user_deleted
|
||||
|
||||
|
||||
# handle signals
|
||||
@receiver(user_deleted)
|
||||
def user_deleted_cb(sender, **kwargs):
|
||||
username = kwargs['username']
|
||||
|
@ -48,7 +48,7 @@ class ShibbolethRemoteUserMiddlewareTest(BaseTestCase):
|
||||
|
||||
self.request.META = {}
|
||||
self.request.META['Shibboleth-eppn'] = 'sampledeveloper@school.edu'
|
||||
self.request.META['REMOTE_USER'] = 'sampledeveloper@school.edu'
|
||||
self.request.META['HTTP_REMOTE_USER'] = 'sampledeveloper@school.edu'
|
||||
self.request.META['givenname'] = 'test_gname'
|
||||
self.request.META['surname'] = 'test_sname'
|
||||
self.request.META['Shibboleth-displayName'] = 'Sample Developer'
|
||||
@ -68,6 +68,12 @@ class ShibbolethRemoteUserMiddlewareTest(BaseTestCase):
|
||||
def test_can_process(self):
|
||||
assert len(Profile.objects.all()) == 0
|
||||
|
||||
# logout first
|
||||
from seahub.auth.models import AnonymousUser
|
||||
self.request.session.flush()
|
||||
self.request.user = AnonymousUser()
|
||||
|
||||
# then login user via thibboleth
|
||||
self.middleware.process_request(self.request)
|
||||
shib_user = SocialAuthUser.objects.get_by_provider_and_uid(
|
||||
SHIBBOLETH_PROVIDER_IDENTIFIER, 'sampledeveloper@school.edu')
|
||||
@ -95,6 +101,12 @@ class ShibbolethRemoteUserMiddlewareTest(BaseTestCase):
|
||||
def test_can_process_user_role(self):
|
||||
assert len(Profile.objects.all()) == 0
|
||||
|
||||
# logout first
|
||||
from seahub.auth.models import AnonymousUser
|
||||
self.request.session.flush()
|
||||
self.request.user = AnonymousUser()
|
||||
|
||||
# then login user via thibboleth
|
||||
self.middleware.process_request(self.request)
|
||||
shib_user = SocialAuthUser.objects.get_by_provider_and_uid(
|
||||
SHIBBOLETH_PROVIDER_IDENTIFIER, 'sampledeveloper@school.edu')
|
||||
@ -191,4 +203,3 @@ class ShibbolethRemoteUserMiddlewareTest(BaseTestCase):
|
||||
assert obj._get_role_by_affiliation('student1@school.edu') == 'student'
|
||||
assert obj._get_role_by_affiliation('a@x.edu') == 'aaa'
|
||||
assert obj._get_role_by_affiliation('a@x.com') == 'guest'
|
||||
|
||||
|
@ -2,30 +2,31 @@
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
|
||||
#At a minimum you will need username,
|
||||
# At a minimum you will need username,
|
||||
default_shib_attributes = {
|
||||
"Shibboleth-eppn": (True, "username"),
|
||||
}
|
||||
}
|
||||
|
||||
SHIB_ATTRIBUTE_MAP = getattr(settings, 'SHIBBOLETH_ATTRIBUTE_MAP', default_shib_attributes)
|
||||
#Set to true if you are testing and want to insert sample headers.
|
||||
# Set to true if you are testing and want to insert sample headers.
|
||||
SHIB_MOCK_HEADERS = getattr(settings, 'SHIBBOLETH_MOCK_HEADERS', False)
|
||||
|
||||
SHIB_USER_HEADER = getattr(settings, 'SHIBBOLETH_USER_HEADER', "REMOTE_USER")
|
||||
SHIB_USER_HEADER = getattr(settings, 'SHIBBOLETH_USER_HEADER', "HTTP_REMOTE_USER")
|
||||
SHIB_USER_HEADER_SECOND_UID = getattr(settings,
|
||||
'SHIBBOLETH_USER_HEADER_SECOND_UID',
|
||||
'HTTP_REMOTE_USER_SUBJECT_ID')
|
||||
|
||||
LOGIN_URL = getattr(settings, 'LOGIN_URL', None)
|
||||
|
||||
if not LOGIN_URL:
|
||||
raise ImproperlyConfigured("A LOGIN_URL is required. Specify in settings.py")
|
||||
|
||||
#Optional logout parameters
|
||||
#This should look like: https://sso.school.edu/idp/logout.jsp?return=%s
|
||||
#The return url variable will be replaced in the LogoutView.
|
||||
# Optional logout parameters
|
||||
# This should look like: https://sso.school.edu/idp/logout.jsp?return=%s
|
||||
# The return url variable will be replaced in the LogoutView.
|
||||
LOGOUT_URL = getattr(settings, 'SHIBBOLETH_LOGOUT_URL', None)
|
||||
#LOGOUT_REDIRECT_URL specifies a default logout page that will always be used when
|
||||
#users logout from Shibboleth.
|
||||
# LOGOUT_REDIRECT_URL specifies a default logout page that will always be used when
|
||||
# users logout from Shibboleth.
|
||||
LOGOUT_REDIRECT_URL = getattr(settings, 'SHIBBOLETH_LOGOUT_REDIRECT_URL', None)
|
||||
#Name of key. Probably no need to change this.
|
||||
# Name of key. Probably no need to change this.
|
||||
LOGOUT_SESSION_KEY = getattr(settings, 'SHIBBOLETH_FORCE_REAUTH_SESSION_KEY', 'shib_force_reauth')
|
||||
|
||||
|
||||
|
@ -38,7 +38,7 @@ class ShibbolethRemoteUserBackend(RemoteUserBackend):
|
||||
user = None
|
||||
return user
|
||||
|
||||
def authenticate(self, remote_user, shib_meta):
|
||||
def authenticate(self, remote_user, shib_meta, second_uid=''):
|
||||
"""
|
||||
The username passed as ``remote_user`` is considered trusted. This
|
||||
method simply returns the ``User`` object with the given username,
|
||||
@ -69,10 +69,21 @@ class ShibbolethRemoteUserBackend(RemoteUserBackend):
|
||||
except User.DoesNotExist:
|
||||
user = None
|
||||
|
||||
if user and second_uid:
|
||||
SocialAuthUser.objects.add_if_not_exists(user.username,
|
||||
SHIBBOLETH_PROVIDER_IDENTIFIER,
|
||||
second_uid)
|
||||
|
||||
if not user and self.create_unknown_user:
|
||||
try:
|
||||
user = User.objects.create_shib_user(is_active=self.activate_after_creation)
|
||||
SocialAuthUser.objects.add(user.username, SHIBBOLETH_PROVIDER_IDENTIFIER, remote_user)
|
||||
SocialAuthUser.objects.add_if_not_exists(user.username,
|
||||
SHIBBOLETH_PROVIDER_IDENTIFIER,
|
||||
remote_user)
|
||||
if second_uid:
|
||||
SocialAuthUser.objects.add_if_not_exists(user.username,
|
||||
SHIBBOLETH_PROVIDER_IDENTIFIER,
|
||||
second_uid)
|
||||
except Exception as e:
|
||||
logger.error('create shib user failed: %s' % e)
|
||||
return None
|
||||
|
@ -11,7 +11,8 @@ from django.urls import reverse
|
||||
from django.http import HttpResponseRedirect
|
||||
from seaserv import seafile_api, ccnet_api
|
||||
|
||||
from shibboleth.app_settings import SHIB_ATTRIBUTE_MAP, LOGOUT_SESSION_KEY, SHIB_USER_HEADER
|
||||
from shibboleth.app_settings import SHIB_ATTRIBUTE_MAP, LOGOUT_SESSION_KEY, \
|
||||
SHIB_USER_HEADER, SHIB_USER_HEADER_SECOND_UID
|
||||
|
||||
from seahub import auth
|
||||
from seahub.base.accounts import User
|
||||
@ -40,8 +41,7 @@ SHIBBOLETH_PROVIDER_IDENTIFIER = getattr(settings, 'SHIBBOLETH_PROVIDER_IDENTIFI
|
||||
|
||||
class ShibbolethRemoteUserMiddleware(RemoteUserMiddleware):
|
||||
"""
|
||||
Authentication Middleware for use with Shibboleth. Uses the recommended pattern
|
||||
for remote authentication from: http://code.djangoproject.com/svn/django/tags/releases/1.3/django/contrib/auth/middleware.py
|
||||
Authentication Middleware for use with Shibboleth.
|
||||
"""
|
||||
def process_request(self, request):
|
||||
if request.path.rstrip('/') != settings.SITE_ROOT + 'sso':
|
||||
@ -74,19 +74,13 @@ class ShibbolethRemoteUserMiddleware(RemoteUserMiddleware):
|
||||
# AuthenticationMiddleware).
|
||||
return
|
||||
|
||||
second_uid = request.META.get(SHIB_USER_HEADER_SECOND_UID, '')
|
||||
|
||||
# If the user is already authenticated and that user is the user we are
|
||||
# getting passed in the headers, then the correct user is already
|
||||
# persisted in the session and we don't need to continue.
|
||||
if request.user.is_authenticated:
|
||||
# If user is already authenticated, the value of request.user.username should be random ID of user,
|
||||
# not the SHIB_USER_HEADER in the request header
|
||||
shib_user = SocialAuthUser.objects.get_by_provider_and_uid(SHIBBOLETH_PROVIDER_IDENTIFIER, remote_user)
|
||||
if shib_user:
|
||||
remote_user = shib_user.username
|
||||
if request.user.username == remote_user:
|
||||
if request.user.is_staff:
|
||||
update_sudo_mode_ts(request)
|
||||
return
|
||||
return
|
||||
|
||||
# Make sure we have all required Shiboleth elements before proceeding.
|
||||
shib_meta, error = self.parse_attributes(request)
|
||||
@ -98,7 +92,9 @@ class ShibbolethRemoteUserMiddleware(RemoteUserMiddleware):
|
||||
|
||||
# We are seeing this user for the first time in this session, attempt
|
||||
# to authenticate the user.
|
||||
user = auth.authenticate(remote_user=remote_user, shib_meta=shib_meta)
|
||||
user = auth.authenticate(remote_user=remote_user,
|
||||
shib_meta=shib_meta,
|
||||
second_uid=second_uid)
|
||||
if user:
|
||||
if not user.is_active:
|
||||
return HttpResponseRedirect(reverse('shib_complete'))
|
||||
|
Loading…
Reference in New Issue
Block a user