Files
jumpserver/apps/authentication/backends/cert/views.py
Jiangjie Bai 0d15c50e1f Feat authcert (#16856)
* feat: add auth cert config

* feat: add auth cert api driver.js

* feat: add auth cert enroll api - draft

* feat: add auth cert demo config yaml

* feat: finished gmssl sign user csr to cert

* feat: support auth cert login

* feat: support auth cert login

* perf: user login via cert, and driver config

* feat: user profile api add can_cert_auth field

* feat: add cert auth log

* feat: add cert auth support check acl, ip_block etc.

* feat: cert auth support mfa check

* feat: cert auth support mfa check

* feat: little perf

* feat: cert config add i18n

* feat: cert login html add i18n

* feat: add i18n lina

* feat: add driver config demo

* feat: add cert auth to settings

* feat: add gmssl dockerfile-ee

* feat: add user source choices

* feat: remove gmssl-python sdk
2026-05-25 16:41:47 +08:00

130 lines
4.7 KiB
Python

# -*- coding: utf-8 -*-
#
import secrets
from django.conf import settings
from django.contrib.auth import authenticate, login as auth_login
from django.core.cache import cache
from django.utils.decorators import method_decorator
from django.utils.translation import gettext as _
from django.views.decorators.cache import never_cache
from django.views.decorators.csrf import csrf_protect
from django.views.decorators.debug import sensitive_post_parameters
from django.views.generic.edit import FormView
from django.shortcuts import redirect
from django.http.response import HttpResponseRedirect
from common.utils import reverse, safe_next_url
from users.utils import redirect_user_first_login_or_index
from authentication.mixins import AuthMixin
from authentication.errors import ACLError
from authentication.errors import (
AuthFailedError, LoginConfirmBaseError, NeedRedirectError
)
from .forms import CertLoginForm
from users.utils import LoginBlockUtil, LoginIpBlockUtil
__all__ = ['CertLoginView']
_CHALLENGE_CACHE_KEY_PREFIX = 'cert_login_challenge'
NEXT_URL = 'next'
@method_decorator(sensitive_post_parameters(), name='dispatch')
@method_decorator(csrf_protect, name='dispatch')
@method_decorator(never_cache, name='dispatch')
class CertLoginView(AuthMixin, FormView):
template_name = 'authentication/cert_login.html'
form_class = CertLoginForm
redirect_field_name = 'next'
# ------------------------------------------------------------------
# Challenge helpers
# ------------------------------------------------------------------
def _ensure_session(self):
if not self.request.session.session_key:
self.request.session.create()
def _challenge_cache_key(self):
self._ensure_session()
return f'{_CHALLENGE_CACHE_KEY_PREFIX}_{self.request.session.session_key}'
def _generate_and_store_challenge(self):
challenge = secrets.token_hex(16)
ttl = getattr(settings, 'AUTH_CERT_CHALLENGE_TTL', 300)
cache.set(self._challenge_cache_key(), challenge, ttl)
return challenge
def _get_stored_challenge(self):
return cache.get(self._challenge_cache_key(), '')
def _delete_stored_challenge(self):
cache.delete(self._challenge_cache_key())
# ------------------------------------------------------------------
# Views
# ------------------------------------------------------------------
def get(self, request, *args, **kwargs):
challenge = self._generate_and_store_challenge()
context = self.get_context_data(form=self.get_form(), challenge=challenge)
return self.render_to_response(context)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
if 'challenge' not in context:
context['challenge'] = self._get_stored_challenge()
return context
def form_valid(self, form):
username = form.cleaned_data['username']
cert = form.cleaned_data['cert']
signature = form.cleaned_data['signature']
challenge = self._get_stored_challenge()
error_msg = None
ip = self.get_request_ip()
try:
self._check_is_block(username, True)
self._check_only_allow_exists_user_auth(username)
user = authenticate(
self.request, username=username, cert=cert, signature=signature, challenge=challenge
)
if user is None:
error_msg = _('Invalid credentials')
return self.get_failed_response(form, username, error_msg)
username = user.username
self._check_login_acl(user, ip)
LoginIpBlockUtil(ip).clean_block_if_need()
LoginBlockUtil(username, ip).clean_failed_count()
except AuthFailedError as e:
error_msg = e.msg
except NeedRedirectError as e:
return redirect(e.url)
except Exception as e:
error_msg = str(e)
finally:
self._delete_stored_challenge()
if error_msg:
return self.get_failed_response(form, username, error_msg)
else:
return self.get_success_response(self.request, user)
def get_failed_response(self, form, username, error_msg):
form.add_error(None, error_msg)
# Refresh the challenge so it cannot be replayed
challenge = self._generate_and_store_challenge()
context = self.get_context_data(form=form, challenge=challenge)
self.send_auth_signal(success=False, reason=error_msg, username=username)
return self.render_to_response(context)
def get_success_response(self, request, user):
self.mark_cert_ok(user, auth_backend=settings.AUTH_BACKEND_CERT)
return self.redirect_to_guard_view()