mirror of
https://github.com/jumpserver/jumpserver.git
synced 2025-09-24 21:08:30 +00:00
[Update] 基本完成登录逻辑
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
|
||||
from .login import *
|
||||
from .mfa import *
|
||||
|
@@ -16,22 +16,20 @@ from django.views.decorators.debug import sensitive_post_parameters
|
||||
from django.views.generic.base import TemplateView, RedirectView
|
||||
from django.views.generic.edit import FormView
|
||||
from django.conf import settings
|
||||
from django.urls import reverse_lazy
|
||||
|
||||
from common.utils import get_request_ip, get_object_or_none
|
||||
from users.models import User
|
||||
from users.utils import (
|
||||
check_otp_code, is_block_login, clean_failed_count, get_user_or_tmp_user,
|
||||
set_tmp_user_to_cache, increase_login_failed_count,
|
||||
get_user_or_tmp_user, increase_login_failed_count,
|
||||
redirect_user_first_login_or_index
|
||||
)
|
||||
from ..models import LoginConfirmSetting
|
||||
from ..signals import post_auth_success, post_auth_failed
|
||||
from .. import forms
|
||||
from .. import errors
|
||||
from .. import forms, mixins, errors
|
||||
|
||||
|
||||
__all__ = [
|
||||
'UserLoginView', 'UserLoginOtpView', 'UserLogoutView',
|
||||
'UserLoginView', 'UserLogoutView',
|
||||
'UserLoginGuardView', 'UserLoginWaitConfirmView',
|
||||
]
|
||||
|
||||
@@ -39,10 +37,11 @@ __all__ = [
|
||||
@method_decorator(sensitive_post_parameters(), name='dispatch')
|
||||
@method_decorator(csrf_protect, name='dispatch')
|
||||
@method_decorator(never_cache, name='dispatch')
|
||||
class UserLoginView(FormView):
|
||||
class UserLoginView(mixins.AuthMixin, FormView):
|
||||
form_class = forms.UserLoginForm
|
||||
form_class_captcha = forms.UserLoginCaptchaForm
|
||||
key_prefix_captcha = "_LOGIN_INVALID_{}"
|
||||
redirect_field_name = 'next'
|
||||
|
||||
def get_template_names(self):
|
||||
template_name = 'authentication/login.html'
|
||||
@@ -69,54 +68,25 @@ class UserLoginView(FormView):
|
||||
request.session.set_test_cookie()
|
||||
return super().get(request, *args, **kwargs)
|
||||
|
||||
def post(self, request, *args, **kwargs):
|
||||
# limit login authentication
|
||||
ip = get_request_ip(request)
|
||||
username = self.request.POST.get('username')
|
||||
if is_block_login(username, ip):
|
||||
return self.render_to_response(self.get_context_data(block_login=True))
|
||||
return super().post(request, *args, **kwargs)
|
||||
|
||||
def form_valid(self, form):
|
||||
if not self.request.session.test_cookie_worked():
|
||||
return HttpResponse(_("Please enable cookies and try again."))
|
||||
user = form.get_user()
|
||||
# user password expired
|
||||
if user.password_has_expired:
|
||||
reason = errors.password_expired
|
||||
self.send_auth_signal(success=False, username=user.username, reason=reason)
|
||||
return self.render_to_response(self.get_context_data(password_expired=True))
|
||||
|
||||
set_tmp_user_to_cache(self.request, user)
|
||||
username = form.cleaned_data.get('username')
|
||||
ip = get_request_ip(self.request)
|
||||
# 登陆成功,清除缓存计数
|
||||
clean_failed_count(username, ip)
|
||||
self.request.session['auth_password'] = '1'
|
||||
try:
|
||||
self.check_user_auth()
|
||||
except errors.AuthFailedError as e:
|
||||
form.add_error(None, e.msg)
|
||||
ip = self.get_request_ip()
|
||||
cache.set(self.key_prefix_captcha.format(ip), 1, 3600)
|
||||
context = self.get_context_data(form=form)
|
||||
return self.render_to_response(context)
|
||||
return self.redirect_to_guard_view()
|
||||
|
||||
def form_invalid(self, form):
|
||||
# write login failed log
|
||||
username = form.cleaned_data.get('username')
|
||||
exist = User.objects.filter(username=username).first()
|
||||
reason = errors.password_failed if exist else errors.user_not_exist
|
||||
# limit user login failed count
|
||||
ip = get_request_ip(self.request)
|
||||
increase_login_failed_count(username, ip)
|
||||
form.add_limit_login_error(username, ip)
|
||||
# show captcha
|
||||
cache.set(self.key_prefix_captcha.format(ip), 1, 3600)
|
||||
self.send_auth_signal(success=False, username=username, reason=reason)
|
||||
|
||||
old_form = form
|
||||
form = self.form_class_captcha(data=form.data)
|
||||
form._errors = old_form.errors
|
||||
return super().form_invalid(form)
|
||||
|
||||
@staticmethod
|
||||
def redirect_to_guard_view():
|
||||
continue_url = reverse('authentication:login-guard')
|
||||
return redirect(continue_url)
|
||||
def redirect_to_guard_view(self):
|
||||
guard_url = reverse('authentication:login-guard')
|
||||
args = self.request.META.get('QUERY_STRING', '')
|
||||
if args and self.query_string:
|
||||
guard_url = "%s?%s" % (guard_url, args)
|
||||
return redirect(guard_url)
|
||||
|
||||
def get_form_class(self):
|
||||
ip = get_request_ip(self.request)
|
||||
@@ -134,58 +104,34 @@ class UserLoginView(FormView):
|
||||
return super().get_context_data(**kwargs)
|
||||
|
||||
|
||||
class UserLoginOtpView(FormView):
|
||||
template_name = 'authentication/login_otp.html'
|
||||
form_class = forms.UserCheckOtpCodeForm
|
||||
class UserLoginGuardView(mixins.AuthMixin, RedirectView):
|
||||
redirect_field_name = 'next'
|
||||
login_url = reverse_lazy('authentication:login')
|
||||
login_otp_url = reverse_lazy('authentication:login-otp')
|
||||
login_confirm_url = reverse_lazy('authentication:login-wait-confirm')
|
||||
|
||||
def form_valid(self, form):
|
||||
user = get_user_or_tmp_user(self.request)
|
||||
otp_code = form.cleaned_data.get('otp_code')
|
||||
otp_secret_key = user.otp_secret_key
|
||||
|
||||
if check_otp_code(otp_secret_key, otp_code):
|
||||
self.request.session['auth_otp'] = '1'
|
||||
return UserLoginView.redirect_to_guard_view()
|
||||
else:
|
||||
self.send_auth_signal(
|
||||
success=False, username=user.username,
|
||||
reason=errors.mfa_failed
|
||||
)
|
||||
form.add_error(
|
||||
'otp_code', _('MFA code invalid, or ntp sync server time')
|
||||
)
|
||||
return super().form_invalid(form)
|
||||
|
||||
def send_auth_signal(self, success=True, user=None, username='', reason=''):
|
||||
if success:
|
||||
post_auth_success.send(sender=self.__class__, user=user, request=self.request)
|
||||
else:
|
||||
post_auth_failed.send(
|
||||
sender=self.__class__, username=username,
|
||||
request=self.request, reason=reason
|
||||
)
|
||||
|
||||
|
||||
class UserLoginGuardView(RedirectView):
|
||||
redirect_field_name = 'next'
|
||||
def format_redirect_url(self, url):
|
||||
args = self.request.META.get('QUERY_STRING', '')
|
||||
if args and self.query_string:
|
||||
url = "%s?%s" % (url, args)
|
||||
return url
|
||||
|
||||
def get_redirect_url(self, *args, **kwargs):
|
||||
if not self.request.session.get('auth_password'):
|
||||
return reverse('authentication:login')
|
||||
|
||||
user = get_user_or_tmp_user(self.request)
|
||||
return self.format_redirect_url(self.login_url)
|
||||
user = self.get_user_from_session()
|
||||
# 启用并设置了otp
|
||||
if user.otp_enabled and user.otp_secret_key and \
|
||||
not self.request.session.get('auth_otp'):
|
||||
return reverse('authentication:login-otp')
|
||||
not self.request.session.get('auth_mfa'):
|
||||
return self.format_redirect_url(self.login_otp_url)
|
||||
confirm_setting = user.get_login_confirm_setting()
|
||||
if confirm_setting and not self.request.session.get('auth_confirm'):
|
||||
order = confirm_setting.create_confirm_order(self.request)
|
||||
self.request.session['auth_order_id'] = str(order.id)
|
||||
url = reverse('authentication:login-wait-confirm')
|
||||
url = self.format_redirect_url(self.login_confirm_url)
|
||||
return url
|
||||
self.login_success(user)
|
||||
self.clear_auth_mark()
|
||||
# 启用但是没有设置otp
|
||||
if user.otp_enabled and not user.otp_secret_key:
|
||||
# 1,2,mfa_setting & F
|
||||
|
25
apps/authentication/views/mfa.py
Normal file
25
apps/authentication/views/mfa.py
Normal file
@@ -0,0 +1,25 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
|
||||
from __future__ import unicode_literals
|
||||
from django.views.generic.edit import FormView
|
||||
from .. import forms, errors, mixins
|
||||
from .utils import redirect_to_guard_view
|
||||
|
||||
__all__ = ['UserLoginOtpView']
|
||||
|
||||
|
||||
class UserLoginOtpView(mixins.AuthMixin, FormView):
|
||||
template_name = 'authentication/login_otp.html'
|
||||
form_class = forms.UserCheckOtpCodeForm
|
||||
redirect_field_name = 'next'
|
||||
|
||||
def form_valid(self, form):
|
||||
otp_code = form.cleaned_data.get('otp_code')
|
||||
try:
|
||||
self.check_user_mfa(otp_code)
|
||||
return redirect_to_guard_view()
|
||||
except errors.MFAFailedError as e:
|
||||
form.add_error('otp_code', e.reason)
|
||||
return super().form_invalid(form)
|
||||
|
8
apps/authentication/views/utils.py
Normal file
8
apps/authentication/views/utils.py
Normal file
@@ -0,0 +1,8 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
from django.shortcuts import reverse, redirect
|
||||
|
||||
|
||||
def redirect_to_guard_view():
|
||||
continue_url = reverse('authentication:login-guard')
|
||||
return redirect(continue_url)
|
Reference in New Issue
Block a user