mirror of
				https://github.com/jumpserver/jumpserver.git
				synced 2025-11-04 07:55:39 +00:00 
			
		
		
		
	* feat: OAuth2.0登录方式加上用户登录规则校验 * fix: 修复第三方用户登录规则(复核)问题 * fix: 增加上了第三方用户登录失败的原因 * fix: 修改变量名称 Co-authored-by: huangzhiwen <zhiwen.huang@fit2cloud.com>
		
			
				
	
	
		
			87 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			87 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from importlib import import_module
 | 
						||
 | 
						||
from django.conf import settings
 | 
						||
from django.contrib.auth import user_logged_in
 | 
						||
from django.core.cache import cache
 | 
						||
from django.dispatch import receiver
 | 
						||
from django_cas_ng.signals import cas_user_authenticated
 | 
						||
 | 
						||
from apps.jumpserver.settings.auth import AUTHENTICATION_BACKENDS_THIRD_PARTY
 | 
						||
from authentication.backends.oidc.signals import (
 | 
						||
    openid_user_login_failed, openid_user_login_success
 | 
						||
)
 | 
						||
from authentication.backends.saml2.signals import (
 | 
						||
    saml2_user_authenticated, saml2_user_authentication_failed
 | 
						||
)
 | 
						||
from authentication.backends.oauth2.signals import (
 | 
						||
    oauth2_user_login_failed, oauth2_user_login_success
 | 
						||
)
 | 
						||
from .signals import post_auth_success, post_auth_failed
 | 
						||
 | 
						||
 | 
						||
@receiver(user_logged_in)
 | 
						||
def on_user_auth_login_success(sender, user, request, **kwargs):
 | 
						||
    # 失效 perms 缓存
 | 
						||
    user.expire_rbac_perms_cache()
 | 
						||
 | 
						||
    # 开启了 MFA,且没有校验过, 可以全局校验, middleware 中可以全局管理 oidc 等第三方认证的 MFA
 | 
						||
    if settings.SECURITY_MFA_AUTH_ENABLED_FOR_THIRD_PARTY \
 | 
						||
            and user.mfa_enabled \
 | 
						||
            and not request.session.get('auth_mfa'):
 | 
						||
        request.session['auth_mfa_required'] = 1
 | 
						||
    if not request.session.get("auth_third_party_done") and request.session.get('auth_backend') in AUTHENTICATION_BACKENDS_THIRD_PARTY:
 | 
						||
        request.session['auth_third_party_required'] = 1
 | 
						||
    # 单点登录,超过了自动退出
 | 
						||
    if settings.USER_LOGIN_SINGLE_MACHINE_ENABLED:
 | 
						||
        lock_key = 'single_machine_login_' + str(user.id)
 | 
						||
        session_key = cache.get(lock_key)
 | 
						||
        if session_key and session_key != request.session.session_key:
 | 
						||
            session = import_module(settings.SESSION_ENGINE).SessionStore(session_key)
 | 
						||
            session.delete()
 | 
						||
        cache.set(lock_key, request.session.session_key, None)
 | 
						||
 | 
						||
    # 标记登录,设置 cookie,前端可以控制刷新, Middleware 会拦截这个生成 cookie
 | 
						||
    request.session['auth_session_expiration_required'] = 1
 | 
						||
 | 
						||
 | 
						||
@receiver(openid_user_login_success)
 | 
						||
def on_oidc_user_login_success(sender, request, user, create=False, **kwargs):
 | 
						||
    request.session['auth_backend'] = settings.AUTH_BACKEND_OIDC_CODE
 | 
						||
    post_auth_success.send(sender, user=user, request=request)
 | 
						||
 | 
						||
 | 
						||
@receiver(openid_user_login_failed)
 | 
						||
def on_oidc_user_login_failed(sender, username, request, reason, **kwargs):
 | 
						||
    request.session['auth_backend'] = settings.AUTH_BACKEND_OIDC_CODE
 | 
						||
    post_auth_failed.send(sender, username=username, request=request, reason=reason)
 | 
						||
 | 
						||
 | 
						||
@receiver(cas_user_authenticated)
 | 
						||
def on_cas_user_login_success(sender, request, user, **kwargs):
 | 
						||
    request.session['auth_backend'] = settings.AUTH_BACKEND_CAS
 | 
						||
    post_auth_success.send(sender, user=user, request=request)
 | 
						||
 | 
						||
 | 
						||
@receiver(saml2_user_authenticated)
 | 
						||
def on_saml2_user_login_success(sender, request, user, **kwargs):
 | 
						||
    request.session['auth_backend'] = settings.AUTH_BACKEND_SAML2
 | 
						||
    post_auth_success.send(sender, user=user, request=request)
 | 
						||
 | 
						||
 | 
						||
@receiver(saml2_user_authentication_failed)
 | 
						||
def on_saml2_user_login_failed(sender, request, username, reason, **kwargs):
 | 
						||
    request.session['auth_backend'] = settings.AUTH_BACKEND_SAML2
 | 
						||
    post_auth_failed.send(sender, username=username, request=request, reason=reason)
 | 
						||
 | 
						||
 | 
						||
@receiver(oauth2_user_login_success)
 | 
						||
def on_oauth2_user_login_success(sender, request, user, **kwargs):
 | 
						||
    request.session['auth_backend'] = settings.AUTH_BACKEND_OAUTH2
 | 
						||
    post_auth_success.send(sender, user=user, request=request)
 | 
						||
 | 
						||
 | 
						||
@receiver(oauth2_user_login_failed)
 | 
						||
def on_oauth2_user_login_failed(sender, username, request, reason, **kwargs):
 | 
						||
    request.session['auth_backend'] = settings.AUTH_BACKEND_OAUTH2
 | 
						||
    post_auth_failed.send(sender, username=username, request=request, reason=reason)
 |