mirror of
https://github.com/jumpserver/jumpserver.git
synced 2025-09-16 07:18:22 +00:00
pref: 优化MFA (#7153)
* perf: 优化mfa 和登录 * perf: stash * stash * pref: 基本完成 * perf: remove init function * perf: 优化命名 * perf: 优化backends * perf: 基本完成优化 * perf: 修复首页登录时没有 toastr 的问题 Co-authored-by: ibuler <ibuler@qq.com> Co-authored-by: Jiangjie.Bai <32935519+BaiJiangJie@users.noreply.github.com>
This commit is contained in:
5
apps/authentication/mfa/__init__.py
Normal file
5
apps/authentication/mfa/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from .otp import MFAOtp, otp_failed_msg
|
||||
from .sms import MFASms
|
||||
from .radius import MFARadius
|
||||
|
||||
MFA_BACKENDS = [MFAOtp, MFASms, MFARadius]
|
72
apps/authentication/mfa/base.py
Normal file
72
apps/authentication/mfa/base.py
Normal file
@@ -0,0 +1,72 @@
|
||||
import abc
|
||||
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
|
||||
class BaseMFA(abc.ABC):
|
||||
placeholder = _('Please input security code')
|
||||
|
||||
def __init__(self, user):
|
||||
"""
|
||||
:param user: Authenticated user, Anonymous or None
|
||||
因为首页登录时,可能没法获取到一些状态
|
||||
"""
|
||||
self.user = user
|
||||
|
||||
def is_authenticated(self):
|
||||
return self.user and self.user.is_authenticated
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def name(self):
|
||||
return ''
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def display_name(self):
|
||||
return ''
|
||||
|
||||
@staticmethod
|
||||
def challenge_required():
|
||||
return False
|
||||
|
||||
def send_challenge(self):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def check_code(self, code) -> tuple:
|
||||
return False, 'Error msg'
|
||||
|
||||
@abc.abstractmethod
|
||||
def is_active(self):
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
@abc.abstractmethod
|
||||
def global_enabled():
|
||||
return False
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_enable_url(self) -> str:
|
||||
return ''
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_disable_url(self) -> str:
|
||||
return ''
|
||||
|
||||
@abc.abstractmethod
|
||||
def disable(self):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def can_disable(self) -> bool:
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def help_text_of_enable():
|
||||
return ''
|
||||
|
||||
@staticmethod
|
||||
def help_text_of_disable():
|
||||
return ''
|
||||
|
51
apps/authentication/mfa/otp.py
Normal file
51
apps/authentication/mfa/otp.py
Normal file
@@ -0,0 +1,51 @@
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from django.shortcuts import reverse
|
||||
|
||||
from .base import BaseMFA
|
||||
|
||||
|
||||
otp_failed_msg = _("OTP code invalid, or server time error")
|
||||
|
||||
|
||||
class MFAOtp(BaseMFA):
|
||||
name = 'otp'
|
||||
display_name = _('OTP')
|
||||
|
||||
def check_code(self, code):
|
||||
from users.utils import check_otp_code
|
||||
assert self.is_authenticated()
|
||||
|
||||
ok = check_otp_code(self.user.otp_secret_key, code)
|
||||
msg = '' if ok else otp_failed_msg
|
||||
return ok, msg
|
||||
|
||||
def is_active(self):
|
||||
if not self.is_authenticated():
|
||||
return True
|
||||
return self.user.otp_secret_key
|
||||
|
||||
@staticmethod
|
||||
def global_enabled():
|
||||
return True
|
||||
|
||||
def get_enable_url(self) -> str:
|
||||
return reverse('authentication:user-otp-enable-start')
|
||||
|
||||
def disable(self):
|
||||
assert self.is_authenticated()
|
||||
self.user.otp_secret_key = ''
|
||||
self.user.save(update_fields=['otp_secret_key'])
|
||||
|
||||
def can_disable(self) -> bool:
|
||||
return True
|
||||
|
||||
def get_disable_url(self):
|
||||
return reverse('authentication:user-otp-disable')
|
||||
|
||||
@staticmethod
|
||||
def help_text_of_enable():
|
||||
return _("Virtual OTP based MFA")
|
||||
|
||||
def help_text_of_disable(self):
|
||||
return ''
|
||||
|
46
apps/authentication/mfa/radius.py
Normal file
46
apps/authentication/mfa/radius.py
Normal file
@@ -0,0 +1,46 @@
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
from django.conf import settings
|
||||
|
||||
from .base import BaseMFA
|
||||
from ..backends.radius import RadiusBackend
|
||||
|
||||
mfa_failed_msg = _("Radius verify code invalid")
|
||||
|
||||
|
||||
class MFARadius(BaseMFA):
|
||||
name = 'otp_radius'
|
||||
display_name = _('Radius MFA')
|
||||
|
||||
def check_code(self, code):
|
||||
assert self.is_authenticated()
|
||||
backend = RadiusBackend()
|
||||
username = self.user.username
|
||||
user = backend.authenticate(
|
||||
None, username=username, password=code
|
||||
)
|
||||
ok = user is not None
|
||||
msg = '' if ok else mfa_failed_msg
|
||||
return ok, msg
|
||||
|
||||
def is_active(self):
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def global_enabled():
|
||||
return settings.OTP_IN_RADIUS
|
||||
|
||||
def get_enable_url(self) -> str:
|
||||
return ''
|
||||
|
||||
def can_disable(self):
|
||||
return False
|
||||
|
||||
def disable(self):
|
||||
return ''
|
||||
|
||||
@staticmethod
|
||||
def help_text_of_disable():
|
||||
return _("Radius global enabled, cannot disable")
|
||||
|
||||
def get_disable_url(self) -> str:
|
||||
return ''
|
60
apps/authentication/mfa/sms.py
Normal file
60
apps/authentication/mfa/sms.py
Normal file
@@ -0,0 +1,60 @@
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
from django.conf import settings
|
||||
|
||||
from .base import BaseMFA
|
||||
from common.sdk.sms import SendAndVerifySMSUtil
|
||||
|
||||
sms_failed_msg = _("SMS verify code invalid")
|
||||
|
||||
|
||||
class MFASms(BaseMFA):
|
||||
name = 'sms'
|
||||
display_name = _("SMS")
|
||||
placeholder = _("SMS verification code")
|
||||
|
||||
def __init__(self, user):
|
||||
super().__init__(user)
|
||||
phone = user.phone if self.is_authenticated() else ''
|
||||
self.sms = SendAndVerifySMSUtil(phone)
|
||||
|
||||
def check_code(self, code):
|
||||
assert self.is_authenticated()
|
||||
ok = self.sms.verify(code)
|
||||
msg = '' if ok else sms_failed_msg
|
||||
return ok, msg
|
||||
|
||||
def is_active(self):
|
||||
if not self.is_authenticated():
|
||||
return True
|
||||
return self.user.phone
|
||||
|
||||
@staticmethod
|
||||
def challenge_required():
|
||||
return True
|
||||
|
||||
def send_challenge(self):
|
||||
self.sms.gen_and_send()
|
||||
|
||||
@staticmethod
|
||||
def global_enabled():
|
||||
return settings.SMS_ENABLED
|
||||
|
||||
def get_enable_url(self) -> str:
|
||||
return '/ui/#/users/profile/?activeTab=ProfileUpdate'
|
||||
|
||||
def can_disable(self) -> bool:
|
||||
return True
|
||||
|
||||
def disable(self):
|
||||
return '/ui/#/users/profile/?activeTab=ProfileUpdate'
|
||||
|
||||
@staticmethod
|
||||
def help_text_of_enable():
|
||||
return _("Set phone number to enable")
|
||||
|
||||
@staticmethod
|
||||
def help_text_of_disable():
|
||||
return _("Clear phone number to disable")
|
||||
|
||||
def get_disable_url(self) -> str:
|
||||
return '/ui/#/users/profile/?activeTab=ProfileUpdate'
|
Reference in New Issue
Block a user