mirror of
https://github.com/jumpserver/jumpserver.git
synced 2026-07-02 23:23:21 +00:00
169 lines
6.1 KiB
Python
169 lines
6.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
import secrets
|
|
from urllib.parse import urlencode
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth import authenticate
|
|
from django.core.cache import cache
|
|
from django.utils.decorators import method_decorator
|
|
from django.utils.translation import gettext as _
|
|
from django.views.decorators.cache import never_cache
|
|
from django.views.decorators.csrf import csrf_protect
|
|
from django.views.decorators.debug import sensitive_post_parameters
|
|
from django.views.generic.edit import FormView
|
|
from django.shortcuts import redirect
|
|
|
|
from authentication.mixins import AuthMixin
|
|
from authentication.errors import (
|
|
AuthFailedError, NeedRedirectError
|
|
)
|
|
from .forms import UKeyLoginForm
|
|
from users.utils import LoginBlockUtil, LoginIpBlockUtil
|
|
from .sdk import ukey_sdk_config
|
|
|
|
|
|
__all__ = ['UKeyLoginView']
|
|
|
|
_CHALLENGE_CACHE_KEY_PREFIX = 'ukey_login_challenge'
|
|
_UKEY_ERROR_SESSION_KEY = 'ukey_login_error'
|
|
|
|
@method_decorator(sensitive_post_parameters(), name='dispatch')
|
|
@method_decorator(csrf_protect, name='dispatch')
|
|
@method_decorator(never_cache, name='dispatch')
|
|
class UKeyLoginView(AuthMixin, FormView):
|
|
template_name = 'authentication/login_ukey.html'
|
|
form_class = UKeyLoginForm
|
|
redirect_field_name = 'next'
|
|
|
|
# ------------------------------------------------------------------
|
|
# Challenge helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _ensure_session(self):
|
|
if not self.request.session.session_key:
|
|
self.request.session.create()
|
|
|
|
def _challenge_cache_key(self):
|
|
self._ensure_session()
|
|
return f'{_CHALLENGE_CACHE_KEY_PREFIX}_{self.request.session.session_key}'
|
|
|
|
def _generate_and_store_challenge(self):
|
|
challenge = secrets.token_hex(16)
|
|
ttl = ukey_sdk_config.challenge_ttl
|
|
cache.set(self._challenge_cache_key(), challenge, ttl)
|
|
return challenge
|
|
|
|
def _get_stored_challenge(self):
|
|
return cache.get(self._challenge_cache_key(), '')
|
|
|
|
def _delete_stored_challenge(self):
|
|
cache.delete(self._challenge_cache_key())
|
|
|
|
def _get_next_url(self):
|
|
next = self.request.GET.get(self.redirect_field_name)
|
|
next = next or self.request.POST.get(self.redirect_field_name)
|
|
return next
|
|
|
|
def _build_login_redirect_url(self):
|
|
next_url = self._get_next_url()
|
|
if not next_url:
|
|
return self.request.path
|
|
query = urlencode({self.redirect_field_name: next_url})
|
|
return f'{self.request.path}?{query}'
|
|
|
|
# ------------------------------------------------------------------
|
|
# Views
|
|
# ------------------------------------------------------------------
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
context['challenge'] = self._generate_and_store_challenge()
|
|
context['error_msg'] = self.request.session.pop(_UKEY_ERROR_SESSION_KEY, '')
|
|
return context
|
|
|
|
def form_valid(self, form):
|
|
username = form.cleaned_data['username']
|
|
cert = form.cleaned_data['cert']
|
|
signature = form.cleaned_data['signature']
|
|
ukey_sn = form.cleaned_data['ukey_sn']
|
|
|
|
challenge = self._get_stored_challenge()
|
|
if not challenge:
|
|
error = _('Authentication challenge expired, please refresh the page and try again.')
|
|
return self.get_failed_response(form, username, error)
|
|
|
|
error_msg = None
|
|
ip = self.get_request_ip()
|
|
|
|
try:
|
|
self._check_is_block(username, True)
|
|
self._check_only_allow_exists_user_auth(username)
|
|
|
|
user = authenticate(
|
|
self.request, username=username, cert=cert, signature=signature,
|
|
challenge=challenge, ukey_sn=ukey_sn,
|
|
)
|
|
if user is None:
|
|
error_msg = getattr(self.request, 'error_message', None) or _('Invalid credentials')
|
|
return self.get_failed_response(form, username, error_msg)
|
|
|
|
username = user.username
|
|
self._check_login_acl(user, ip)
|
|
|
|
LoginIpBlockUtil(ip).clean_block_if_need()
|
|
LoginBlockUtil(username, ip).clean_failed_count()
|
|
except AuthFailedError as e:
|
|
error_msg = e.msg
|
|
except NeedRedirectError as e:
|
|
return redirect(e.url)
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
finally:
|
|
self._delete_stored_challenge()
|
|
|
|
if error_msg:
|
|
return self.get_failed_response(form, username, error_msg)
|
|
else:
|
|
return self.get_success_response(self.request, user)
|
|
|
|
def form_invalid(self, form):
|
|
error_msg = self._get_form_error_message(form)
|
|
username = (form.data.get('username') or '').strip()
|
|
return self.get_failed_response(form, username, error_msg)
|
|
|
|
@staticmethod
|
|
def _get_form_error_message(form):
|
|
non_field_errors = list(form.non_field_errors())
|
|
if non_field_errors:
|
|
return ' '.join(non_field_errors)
|
|
|
|
field_errors = []
|
|
for field_name, errors in form.errors.items():
|
|
if field_name == '__all__':
|
|
continue
|
|
field_label = UKeyLoginView._get_field_label(form, field_name)
|
|
field_errors.append(f"{field_label}: {' '.join(errors)}")
|
|
if field_errors:
|
|
return ' '.join(field_errors)
|
|
return _('Unknown')
|
|
|
|
@staticmethod
|
|
def _get_field_label(form, field_name):
|
|
field = form.fields.get(field_name)
|
|
if field and field.label:
|
|
return field.label
|
|
return field_name
|
|
|
|
def get_failed_response(self, form, username, error_msg):
|
|
self.request.session[_UKEY_ERROR_SESSION_KEY] = str(error_msg or _('Unknown'))
|
|
self.send_auth_signal(success=False, reason=error_msg, username=username)
|
|
return redirect(self._build_login_redirect_url())
|
|
|
|
def get_success_response(self, request, user):
|
|
self.mark_ukey_ok(user, auth_backend=settings.AUTH_BACKEND_UKEY)
|
|
if not settings.SAFE_MODE:
|
|
self.mark_mfa_ok('ukey-pass-mfa', user)
|
|
return self.redirect_to_guard_view()
|
|
|