mirror of
https://github.com/jumpserver/jumpserver.git
synced 2026-07-03 07:39:39 +00:00
* feat: add auth cert config * feat: add auth cert api driver.js * feat: add auth cert enroll api - draft * feat: add auth cert demo config yaml * feat: finished gmssl sign user csr to cert * feat: support auth cert login * feat: support auth cert login * perf: user login via cert, and driver config * feat: user profile api add can_cert_auth field * feat: add cert auth log * feat: add cert auth support check acl, ip_block etc. * feat: cert auth support mfa check * feat: cert auth support mfa check * feat: little perf * feat: cert config add i18n * feat: cert login html add i18n * feat: add i18n lina * feat: add driver config demo * feat: add cert auth to settings * feat: add gmssl dockerfile-ee * feat: add user source choices * feat: remove gmssl-python sdk
130 lines
4.7 KiB
Python
130 lines
4.7 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
import secrets
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth import authenticate, login as auth_login
|
|
from django.core.cache import cache
|
|
from django.utils.decorators import method_decorator
|
|
from django.utils.translation import gettext as _
|
|
from django.views.decorators.cache import never_cache
|
|
from django.views.decorators.csrf import csrf_protect
|
|
from django.views.decorators.debug import sensitive_post_parameters
|
|
from django.views.generic.edit import FormView
|
|
from django.shortcuts import redirect
|
|
from django.http.response import HttpResponseRedirect
|
|
|
|
from common.utils import reverse, safe_next_url
|
|
from users.utils import redirect_user_first_login_or_index
|
|
from authentication.mixins import AuthMixin
|
|
from authentication.errors import ACLError
|
|
from authentication.errors import (
|
|
AuthFailedError, LoginConfirmBaseError, NeedRedirectError
|
|
)
|
|
from .forms import CertLoginForm
|
|
from users.utils import LoginBlockUtil, LoginIpBlockUtil
|
|
|
|
|
|
__all__ = ['CertLoginView']
|
|
|
|
_CHALLENGE_CACHE_KEY_PREFIX = 'cert_login_challenge'
|
|
NEXT_URL = 'next'
|
|
|
|
@method_decorator(sensitive_post_parameters(), name='dispatch')
|
|
@method_decorator(csrf_protect, name='dispatch')
|
|
@method_decorator(never_cache, name='dispatch')
|
|
class CertLoginView(AuthMixin, FormView):
|
|
template_name = 'authentication/cert_login.html'
|
|
form_class = CertLoginForm
|
|
redirect_field_name = 'next'
|
|
|
|
# ------------------------------------------------------------------
|
|
# Challenge helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _ensure_session(self):
|
|
if not self.request.session.session_key:
|
|
self.request.session.create()
|
|
|
|
def _challenge_cache_key(self):
|
|
self._ensure_session()
|
|
return f'{_CHALLENGE_CACHE_KEY_PREFIX}_{self.request.session.session_key}'
|
|
|
|
def _generate_and_store_challenge(self):
|
|
challenge = secrets.token_hex(16)
|
|
ttl = getattr(settings, 'AUTH_CERT_CHALLENGE_TTL', 300)
|
|
cache.set(self._challenge_cache_key(), challenge, ttl)
|
|
return challenge
|
|
|
|
def _get_stored_challenge(self):
|
|
return cache.get(self._challenge_cache_key(), '')
|
|
|
|
def _delete_stored_challenge(self):
|
|
cache.delete(self._challenge_cache_key())
|
|
|
|
# ------------------------------------------------------------------
|
|
# Views
|
|
# ------------------------------------------------------------------
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
challenge = self._generate_and_store_challenge()
|
|
context = self.get_context_data(form=self.get_form(), challenge=challenge)
|
|
return self.render_to_response(context)
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
if 'challenge' not in context:
|
|
context['challenge'] = self._get_stored_challenge()
|
|
return context
|
|
|
|
def form_valid(self, form):
|
|
username = form.cleaned_data['username']
|
|
cert = form.cleaned_data['cert']
|
|
signature = form.cleaned_data['signature']
|
|
challenge = self._get_stored_challenge()
|
|
|
|
error_msg = None
|
|
ip = self.get_request_ip()
|
|
|
|
try:
|
|
self._check_is_block(username, True)
|
|
self._check_only_allow_exists_user_auth(username)
|
|
|
|
user = authenticate(
|
|
self.request, username=username, cert=cert, signature=signature, challenge=challenge
|
|
)
|
|
if user is None:
|
|
error_msg = _('Invalid credentials')
|
|
return self.get_failed_response(form, username, error_msg)
|
|
|
|
username = user.username
|
|
self._check_login_acl(user, ip)
|
|
|
|
LoginIpBlockUtil(ip).clean_block_if_need()
|
|
LoginBlockUtil(username, ip).clean_failed_count()
|
|
except AuthFailedError as e:
|
|
error_msg = e.msg
|
|
except NeedRedirectError as e:
|
|
return redirect(e.url)
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
finally:
|
|
self._delete_stored_challenge()
|
|
|
|
if error_msg:
|
|
return self.get_failed_response(form, username, error_msg)
|
|
else:
|
|
return self.get_success_response(self.request, user)
|
|
|
|
def get_failed_response(self, form, username, error_msg):
|
|
form.add_error(None, error_msg)
|
|
# Refresh the challenge so it cannot be replayed
|
|
challenge = self._generate_and_store_challenge()
|
|
context = self.get_context_data(form=form, challenge=challenge)
|
|
self.send_auth_signal(success=False, reason=error_msg, username=username)
|
|
return self.render_to_response(context)
|
|
|
|
def get_success_response(self, request, user):
|
|
self.mark_cert_ok(user, auth_backend=settings.AUTH_BACKEND_CERT)
|
|
return self.redirect_to_guard_view()
|