Feat authcert (#16856)

* feat: add auth cert config

* feat: add auth cert api driver.js

* feat: add auth cert enroll api - draft

* feat: add auth cert demo config yaml

* feat: finished gmssl sign user csr to cert

* feat: support auth cert login

* feat: support auth cert login

* perf: user login via cert, and driver config

* feat: user profile api add can_cert_auth field

* feat: add cert auth log

* feat: add cert auth support check acl, ip_block etc.

* feat: cert auth support mfa check

* feat: cert auth support mfa check

* feat: little perf

* feat: cert config add i18n

* feat: cert login html add i18n

* feat: add i18n lina

* feat: add driver config demo

* feat: add cert auth to settings

* feat: add gmssl dockerfile-ee

* feat: add user source choices

* feat: remove gmssl-python sdk
This commit is contained in:
Jiangjie Bai
2026-05-25 16:41:47 +08:00
committed by GitHub
parent 7f80560306
commit 0d15c50e1f
40 changed files with 5914 additions and 1974 deletions

View File

@@ -21,6 +21,17 @@ WORKDIR /opt/jumpserver
ARG PIP_MIRROR=https://pypi.org/simple
ARG GMSSL_VERSION=3.1.1
RUN set -ex \
&& uv pip install -i${PIP_MIRROR} --group xpack \
&& rm -rf /root/.cache/
&& rm -rf /root/.cache/
RUN set -ex \
&& wget -q -O /tmp/gmssl-install.sh \
"https://github.com/guanzhi/GmSSL/releases/download/v${GMSSL_VERSION}/GmSSL-${GMSSL_VERSION}-Linux.sh" \
&& chmod +x /tmp/gmssl-install.sh \
&& /tmp/gmssl-install.sh --prefix=/usr/local \
&& echo "/usr/local/GmSSL-${GMSSL_VERSION}-Linux/lib" > /etc/ld.so.conf.d/gmssl.conf \
&& ldconfig \
&& rm -f /tmp/gmssl-install.sh

View File

@@ -0,0 +1 @@
from .backends import *

View File

@@ -0,0 +1,193 @@
import base64
import os
import subprocess
import tempfile
from django.utils.translation import gettext_lazy as _
import yaml
from django.conf import settings
from django.http import FileResponse, Http404
from django.utils.decorators import method_decorator
from django.views.decorators.cache import cache_control
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.permissions import AllowAny
from common.permissions import OnlySuperUser
from common.utils import get_logger
from .driver import cert_vd_cfg
__all__ = ['VendorDriverFileAPIView', 'CertVendorDriverConfigAPIView']
logger = get_logger(__name__)
class VendorDriverFileAPIView(APIView):
permission_classes = (AllowAny,)
@method_decorator(cache_control(public=True, max_age=3600))
def get(self, request):
js_file = cert_vd_cfg.driver_js_file
if not js_file or not os.path.isfile(js_file):
raise Http404
response = FileResponse(open(js_file, 'rb'), content_type='application/javascript')
response['Cache-Control'] = 'public, max-age=3600'
return response
class CertVendorDriverConfigAPIView(APIView):
permission_classes = (AllowAny,)
def get(self, request):
lang = request.COOKIES.get(settings.LANGUAGE_COOKIE_NAME) or settings.LANGUAGE_CODE
data = cert_vd_cfg.get_vendor_sdk_data(lang=lang)
return Response(data)
class CertEnrollAPIView(APIView):
permission_classes = (OnlySuperUser,)
# SM2 曲线 OID1.2.156.10197.1.301
# DER 编码06 08 2a 81 1c cf 55 01 82 2d
_SM2_OID_DER = bytes([0x06, 0x08, 0x2a, 0x81, 0x1c, 0xcf, 0x55, 0x01, 0x82, 0x2d])
def post(self, request):
if not cert_vd_cfg.enroll_enabled:
data = {'error': _('Certificate enrollment is not enabled')}
return Response(data=data, status=400)
csr_raw = request.data.get('csr')
if not csr_raw:
data = {'error': _('CSR is required')}
return Response(data=data, status=400)
try:
singed_cert = self.sign_cert(csr_raw)
except Exception as e:
error = '{}: {}'.format(_('Certificate signing failed'), str(e))
logger.error(error, exc_info=True)
return Response(data={'error': error}, status=400)
data = {'signed_cert': singed_cert}
return Response(data=data, status=200)
def sign_cert(self, csr_raw):
# 记录输入是否含 PEM 头,用于决定输出格式
if isinstance(csr_raw, bytes):
has_pem_header = csr_raw.lstrip().startswith(b'-----BEGIN')
else:
has_pem_header = csr_raw.strip().startswith('-----BEGIN')
csr_pem = self._normalize_csr_to_pem(csr_raw)
if self._is_sm2_csr(csr_pem):
singed_cert = self.sign_cert_by_gmssl(csr_pem)
else:
singed_cert = self.sign_cert_by_other(csr_pem)
# 输入不含 PEM 头时,返回裸 base64去掉首尾标识行
if not has_pem_header:
lines = singed_cert.strip().splitlines()
singed_cert = ''.join(
ln for ln in lines if not ln.startswith('-----')
)
return singed_cert
def _normalize_csr_to_pem(self, csr_data):
"""
将 SDK 返回的 CSR 统一转换成标准 PEM 字符串。
支持三种输入格式:
1. 已经是标准 PEM含 -----BEGIN CERTIFICATE REQUEST----- 头)
2. 裸 base64 字符串(无 PEM 头,国密 USB Key SDK 常见)
3. 原始 DER 二进制 bytes
"""
if isinstance(csr_data, bytes):
if csr_data.lstrip().startswith(b'-----BEGIN'):
return csr_data.decode('utf-8')
b64 = base64.b64encode(csr_data).decode('ascii')
else:
csr_data = csr_data.strip()
if csr_data.startswith('-----BEGIN'):
return csr_data
# 裸 base64去除空白后校验并重新分行
b64 = ''.join(csr_data.split())
base64.b64decode(b64, validate=True)
lines = [b64[i:i + 64] for i in range(0, len(b64), 64)]
return (
'-----BEGIN CERTIFICATE REQUEST-----\n'
+ '\n'.join(lines)
+ '\n-----END CERTIFICATE REQUEST-----\n'
)
def _is_sm2_csr(self, csr_pem):
"""
通过查找 SM2 曲线 OID 字节序列判断 CSR 是否使用 SM2 算法,
无需调用外部工具。
"""
pem_lines = csr_pem.strip().splitlines()
b64 = ''.join(ln for ln in pem_lines if not ln.startswith('-----'))
der = base64.b64decode(b64)
return self._SM2_OID_DER in der
def sign_cert_by_other(self, csr_pem):
pass
def sign_cert_by_gmssl(self, csr_pem):
"""
使用 gmssl reqsign 签发 SM2 证书。
命令示例:
gmssl reqsign -in user.csr -days 365 -cacert root.crt -key root.key -pass 123456 -out user.crt
"""
gmssl_bin = cert_vd_cfg.gmssl_bin
ca_key_path = cert_vd_cfg.ca_key_file
ca_cert_path = cert_vd_cfg.ca_cert_file
ca_key_pass = str(cert_vd_cfg.ca_key_pass)
if not ca_key_path or not os.path.isfile(ca_key_path):
raise FileNotFoundError('CA_KEY_FILE not configured or not found')
if not ca_cert_path or not os.path.isfile(ca_cert_path):
raise FileNotFoundError('CA_CERT_FILE not configured or not found')
validity_days = str(cert_vd_cfg.enroll_validity_days)
csr_file = cert_file = None
try:
with tempfile.NamedTemporaryFile(
suffix='.csr', mode='w', delete=False, encoding='utf-8'
) as f:
f.write(csr_pem)
csr_file = f.name
fd, cert_file = tempfile.mkstemp(suffix='.crt')
os.close(fd)
# https://github.com/GmSSL/GmSSL-Python#sm2数字证书
# gmssl_python 只支持SM2证书的解析和验证等功能不支持SM2证书的签发和生成
# 所以还是需要使用 gmssl bin 来执行 reqsign 命令行工具进行签发。虽然增加了对外部命令的依赖,
# 但这是目前最简单可靠的方案。
cmd = [
gmssl_bin, 'reqsign',
'-in', csr_file,
'-days', validity_days,
'-cacert', ca_cert_path,
'-key', ca_key_path,
'-out', cert_file,
]
if ca_key_pass:
cmd += ['-pass', ca_key_pass]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30
)
if result.returncode != 0:
raise RuntimeError('gmssl reqsign failed: {}'.format(result.stderr.strip()))
with open(cert_file, 'r', encoding='utf-8') as f:
return f.read()
finally:
for path in (csr_file, cert_file):
if path and os.path.exists(path):
os.unlink(path)

View File

@@ -0,0 +1,10 @@
from django.urls import path
from . import api
urlpatterns = [
# api
path('cert/vendor-driver.js/', api.VendorDriverFileAPIView.as_view(), name='cert-vendor-driver-js-file'),
path('cert/vendor-driver-config/', api.CertVendorDriverConfigAPIView.as_view(), name='cert-vendor-driver-config'),
path('cert/enroll/', api.CertEnrollAPIView.as_view(), name='cert-enroll'),
]

View File

@@ -0,0 +1,193 @@
# -*- coding: utf-8 -*-
#
import base64
import os
import tempfile
from django.conf import settings
from users.models import User
from common.utils import get_logger
from ..base import JMSBaseAuthBackend
__all__ = ['CertBackend']
logger = get_logger(__name__)
# SM2 曲线 OID DER 字节序列,用于判断证书算法(与 api.py 保持一致)
_SM2_OID_DER = bytes([0x06, 0x08, 0x2a, 0x81, 0x1c, 0xcf, 0x55, 0x01, 0x82, 0x2d])
class CertBackend(JMSBaseAuthBackend):
backend = settings.AUTH_BACKEND_CERT
@staticmethod
def is_enabled():
return settings.AUTH_CERT
def authenticate(self, request, username, cert, signature, challenge):
try:
cert_pem = self._normalize_cert_to_pem(cert)
except Exception as e:
logger.warning('CertBackend: cert normalization failed: %s', e)
return None
if self._is_sm2_cert(cert_pem):
return self._authenticate_sm2(cert_pem, username, signature, challenge)
else:
return self._authenticate_other(cert_pem, username, signature, challenge)
# ── SM2 四步校验 ─────────────────────────────────────────────────────────
def _authenticate_sm2(self, cert_pem, username, signature, challenge):
# 加载证书(写临时文件 → Sm2Certificate
try:
sm2_cert = self._load_sm2_cert(cert_pem)
except Exception as e:
logger.warning('CertBackend: failed to load SM2 cert: %s', e)
return None
# Step 1: 校验证书链,是否由 CA 根证书签发
try:
self._verify_sm2_cert_chain(sm2_cert)
except Exception as e:
logger.warning('CertBackend: SM2 cert chain verification failed: %s', e)
return None
# Step 2: 从证书 subject 提取 CN与传入 username 比对
cert_cn = sm2_cert.get_subject().get('commonName')
if cert_cn != username:
logger.warning(
'CertBackend: cert CN %r does not match username %r', cert_cn, username
)
return None
# Step 3: 用证书公钥验证签名
public_key = sm2_cert.get_subject_public_key()
try:
sig_ok = self._verify_sm2_signature(public_key, signature, challenge)
except Exception as e:
logger.warning('CertBackend: SM2 signature verification failed: %s', e)
return None
if not sig_ok:
logger.warning('CertBackend: SM2 signature mismatch')
return None
# Step 4: 查询并返回用户
return User.objects.filter(username=username).first()
@staticmethod
def _load_sm2_cert(cert_pem):
"""将 PEM 字符串写入临时文件,加载为 Sm2Certificate 对象后立即删除临时文件。"""
from common.utils.gmssl_python import Sm2Certificate
fd, cert_file = tempfile.mkstemp(suffix='.crt')
try:
os.close(fd)
with open(cert_file, 'w', encoding='utf-8') as f:
f.write(cert_pem)
sm2_cert = Sm2Certificate()
sm2_cert.import_pem(cert_file)
finally:
if os.path.exists(cert_file):
os.unlink(cert_file)
return sm2_cert
def _verify_sm2_cert_chain(self, sm2_cert):
"""调用 Sm2Certificate.verify_by_ca_certificate 验证证书链。"""
from common.utils.gmssl_python import SM2_DEFAULT_ID
ca_cert_path = getattr(settings, 'CA_CERT_FILE', '')
if not ca_cert_path or not os.path.isfile(ca_cert_path):
raise FileNotFoundError('CA_CERT_FILE not configured or not found')
from common.utils.gmssl_python import Sm2Certificate
ca_cert = Sm2Certificate()
ca_cert.import_pem(ca_cert_path)
if not sm2_cert.verify_by_ca_certificate(ca_cert, SM2_DEFAULT_ID):
raise ValueError('SM2 cert chain verification failed')
@staticmethod
def _verify_sm2_signature(sm2_key, signature, challenge):
"""
使用 gmssl_python 的 Sm2Signature 做 SM2withSM3 验签。
sm2_key : Sm2Certificate.get_subject_public_key() 返回的 Sm2Key 对象。
signature : USB Key 返回的签名bytes / hex 字符串 / base64 字符串DER 格式)。
challenge : 服务端下发的挑战码字符串JS 端对 btoa(challenge) 做签名。
"""
from common.utils.gmssl_python import Sm2Signature, DO_VERIFY, SM2_DEFAULT_ID
sig_bytes = CertBackend._decode_signature(signature)
# JS 端直接对 challenge 字符串签名,无需 base64 编码
if isinstance(challenge, bytes):
signed_data = challenge
else:
signed_data = challenge.encode('utf-8')
verifier = Sm2Signature(sm2_key, SM2_DEFAULT_ID, DO_VERIFY)
verifier.update(signed_data)
return bool(verifier.verify(sig_bytes))
# ── 工具方法 ─────────────────────────────────────────────────────────────
@staticmethod
def _is_sm2_cert(cert_pem):
"""通过 OID 字节序列判断证书是否使用 SM2 算法。"""
pem_lines = cert_pem.strip().splitlines()
b64 = ''.join(ln for ln in pem_lines if not ln.startswith('-----'))
der = base64.b64decode(b64)
return _SM2_OID_DER in der
@staticmethod
def _normalize_cert_to_pem(cert_data):
"""
将证书统一转换为标准 PEM 格式。
支持:已含头尾的 PEM、裸 base64 字符串、DER bytes。
"""
if isinstance(cert_data, bytes):
if cert_data.lstrip().startswith(b'-----BEGIN'):
return cert_data.decode('utf-8')
b64 = base64.b64encode(cert_data).decode('ascii')
else:
cert_data = cert_data.strip()
if cert_data.startswith('-----BEGIN'):
return cert_data
b64 = ''.join(cert_data.split())
base64.b64decode(b64, validate=True) # 验证是合法 base64
lines = [b64[i:i + 64] for i in range(0, len(b64), 64)]
return (
'-----BEGIN CERTIFICATE-----\n'
+ '\n'.join(lines)
+ '\n-----END CERTIFICATE-----\n'
)
@staticmethod
def _decode_signature(signature):
"""
将签名值转为 bytes。
依次尝试:已是 bytes → 十六进制字符串 → base64 字符串。
"""
if isinstance(signature, bytes):
return signature
sig = signature.strip()
try:
return bytes.fromhex(sig)
except ValueError:
pass
try:
return base64.b64decode(sig)
except Exception:
pass
raise ValueError('Cannot decode signature: unknown format')
# ── 其他算法(预留)────────────────────────────────────────────────────────
def _authenticate_other(self, cert_pem, username, signature, challenge):
logger.warning('CertBackend: non-SM2 cert verification is not yet implemented')
return None

View File

@@ -0,0 +1,182 @@
import os
import yaml
import json
from django.conf import settings
from common.utils import get_logger
from common.decorators import Singleton
from common.const import Language
logger = get_logger(__name__)
class Setting:
VENDOR = getattr(settings, 'VENDOR', '')
@Singleton
class CertVendorDriverConfig:
"""
从 YAML 配置文件读取所有证书相关配置。
CA 相关路径/密码ca_cert_file / ca_key_file / ca_key_pass属于系统敏感配置
只能在系统设置config.yml / Django settings中配置不允许写入此 YAML。
YAML 结构约定:
cert: # 系统级配置段
gmssl_bin: gmssl # gmssl 二进制路径
challenge_ttl: 300 # Challenge 码 Redis 存活秒数
enroll:
enabled: true # 是否开启证书签发
key_algo: SM2 # 签发密钥算法SM2 或 RSA
subject_cn: username # 用户证书 Subject CN 取自用户的哪个字段
subject_o: JumpServer # 用户证书 Subject O组织名
# 其余 key 为厂商 SDK 方法映射(供前端 API 层使用)
newUKeyAPI: ...
checkInstall: ...
getCertCN: ...
...
"""
def __init__(self):
if not settings.AUTH_CERT:
logger.debug('CertVendorDriverConfig: authentication backend not enabled, skipping config load')
return
config_file = getattr(settings, 'AUTH_CERT_VENDOR_DRIVER_CONFIG_FILE', None)
self._raw = self._load_yaml(config_file)
self._cert = self._raw.get('cert') or {}
self._enroll = self._cert.get('enroll') or {}
# ── YAML 加载 ────────────────────────────────────────────────────────────
@staticmethod
def _load_yaml(config_file):
if not config_file or not os.path.isfile(config_file):
logger.warning('CertVendorDriverConfig: config file not found: %s', config_file)
return {}
with open(config_file, 'r', encoding='utf-8') as f:
return yaml.safe_load(f) or {}
# ── CA / 证书链(只读系统设置,不允许在 YAML 中配置)────────────────────────
@property
def ca_cert_file(self):
"""CA 根证书路径,只从系统设置读取。"""
return getattr(settings, 'CA_CERT_FILE', None)
@property
def ca_key_file(self):
"""CA 私钥路径,只从系统设置读取。"""
return getattr(settings, 'CA_KEY_FILE', None)
@property
def ca_key_pass(self):
"""CA 私钥密码,只从系统设置读取。"""
return getattr(settings, 'CA_KEY_PASS', '')
@property
def driver_js_file(self):
"""返回厂商 SDK 驱动文件的 FileResponse供 API 层使用。"""
return getattr(settings, 'AUTH_CERT_VENDOR_DRIVER_JS_FILE', None)
# ── 工具 ─────────────────────────────────────────────────────────────────
@property
def gmssl_bin(self):
"""gmssl 二进制路径,默认 'gmssl'(系统 PATH 中查找)。"""
return 'gmssl'
# ── 认证流程 ──────────────────────────────────────────────────────────────
@property
def challenge_ttl(self):
"""Challenge 码在 Redis 中的存活时间(秒),默认 300。"""
v = self._cert.get('challenge_ttl', 300)
return int(v)
# ── 证书签发 ──────────────────────────────────────────────────────────────
@property
def enroll_enabled(self):
"""是否开启用户证书签发功能。"""
v = self._enroll.get('enabled', True)
return bool(v)
@property
def enroll_key_algo(self):
"""签发证书时生成密钥对的算法SM2 或 RSA。"""
return self._enroll.get('key_algo', 'SM2')
@property
def enroll_subject_cn(self):
"""用户证书 Subject CN 取自用户模型的哪个字段,默认 'username'"""
return self._enroll.get('subject_cn', 'username')
@property
def enroll_subject_o(self):
"""用户证书 Subject O组织名"""
return self._enroll.get('subject_o', Setting.VENDOR)
@property
def enroll_validity_days(self):
"""签发证书的有效期(天),默认 365。"""
v = self._enroll.get('validity_days', 365)
return int(v)
# ── 厂商 SDK 映射(原始数据,供 API 层序列化给前端)───────────────────────
@staticmethod
def _render(data, trans_filter=None):
"""
渲染 YAML 数据中的 Jinja2 模板表达式。
- {{ settings.xxx }} → 系统设置值(任何时候都生效)
- {{ user.xxx }} → 原样保留,留给前端 JS 运行时解析
- {{ 'text' | trans }} → 按 trans_filter 翻译;不传则原文返回(初始化阶段)
"""
from jinja2 import Undefined, Environment
class KeepUndefined(Undefined):
"""未定义变量原样保留占位符,支持任意深度的属性链。"""
def __str__(self):
return '{{ ' + self._undefined_name + ' }}'
def __getattr__(self, name):
return KeepUndefined(name=f'{self._undefined_name}.{name}')
template_str = json.dumps(data, ensure_ascii=False)
env = Environment(undefined=KeepUndefined)
env.filters['trans'] = trans_filter or (lambda s: s)
rendered = env.from_string(template_str).render(settings=Setting)
return json.loads(rendered)
def _build_trans_filter(self, lang):
"""构建 Jinja2 | trans filter 函数,按 lang 从 YAML i18n 表查找翻译。
未找到翻译时原文返回语言键自动归一化zh_hant → zh-hant
"""
i18n_raw = self._raw.get('i18n') or {}
i18n = {
text: {
Language.to_internal_code(lk.replace('_', '-')): lv
for lk, lv in entries.items()
}
for text, entries in i18n_raw.items()
if isinstance(entries, dict)
}
def trans_filter(s):
translations = i18n.get(str(s))
if not translations:
return s
return translations.get(lang) or translations.get('en') or s
return trans_filter
def get_vendor_sdk_data(self, lang='en'):
"""返回去掉 'cert'/'i18n' 顶层 key 后的厂商 SDK 方法映射。
YAML 中任意字符串值均可用 {{ 'text' | trans }} 语法标记为可翻译。
"""
lang = Language.to_internal_code(lang)
trans_filter = self._build_trans_filter(lang)
data = self._render(self._raw, trans_filter)
return {k: v for k, v in data.items() if k not in ('i18n',)}
cert_vd_cfg = CertVendorDriverConfig()

View File

@@ -0,0 +1,343 @@
# ── 系统级证书配置 ─────────────────────────────────────────────────────────────
# 所有与证书认证相关的系统配置均放在 cert: 段,其余 key 为厂商 SDK 方法映射。
cert:
# Challenge 码在 Redis 中的存活时间(秒)
challenge_ttl: 300
# 证书签发配置
enroll:
enabled: true
key_algo: SM2 # 签发密钥算法SM2 或 RSA
validity_days: 365 # 签发证书的有效期(天)
pin:
default: "88888888"
showBasicInfo:
# 基础加密操作
- version:
label: "{{ 'Version' | trans }}"
method:
call: UKey_GetVersion
description: "{{ 'Get USB Key driver version' | trans }}"
- devSN:
label: "{{ 'Device serial number' | trans }}"
method:
call: UKey_GetDevSN
description: "{{ 'Get device serial number' | trans }}"
- enrollEnabled:
label: "{{ 'Certificate enrollment' | trans }}"
value: "{{ 'Enabled' | trans }}"
description: "{{ 'Whether certificate enrollment is enabled' | trans }}"
only: admin
- enrollKeyAlgo:
label: "{{ 'Enrollment algorithm' | trans }}"
value: "{{ 'SM2' | trans }}"
description: "{{ 'Key algorithm used for enrollment' | trans }}"
only: admin
- enrollValidityDays:
label: "{{ 'Certificate validity' | trans }}"
value: "{{ '365 days' | trans }}"
description: "{{ 'Certificate validity period used for enrollment' | trans }}"
only: admin
# ── 厂商 USB Key SDK 方法映射 ──────────────────────────────────────────────────
# USB Key 厂商 SDK 方法名映射
# Driver 初始化
newUKeyAPI:
description: "{{ 'Initialize USB Key SDK and get SDK instance' | trans }}"
method:
call: UKeyAPI
# 基本操作
checkInstall:
description: "{{ 'Check if USB Key driver is installed' | trans }}"
method:
call: UKey_CheckInstall
# 获取用户名
getCertCN:
description: "{{ 'Get built-in username from USB Key' | trans }}"
method:
call: UKey_GetCertInfo
params:
- key: CERT_TYPE_SIGN
value: 1
- key: CERT_SUBJECT_CN
value: 9
# 管理员重置 PIN
adminResetPIN:
description: "{{ 'Reset PIN' | trans }}"
method:
call: UKey_UnblockPIN
params:
- key: admin_pin
value: '{{ input.admin_pin }}'
- key: usb_key_pin
value: '{{ output.default_pin }}'
# 用户修改 PIN
userChangePIN:
description: "{{ 'User change PIN' | trans }}"
method:
call: UKey_ChangePIN
params:
- key: old_pin
value: '{{ input.old_pin }}'
- key: new_pin1
value: '{{ input.new_pin1 }}'
result:
success: 0
# 清除证书
deleteCert:
description: "{{ 'Delete existing certificate' | trans }}"
method:
call: UKey_DeleteCons
# 校验 PIN
checkPIN:
description: "{{ 'Verify PIN' | trans }}"
method:
call: UKey_VerifyPIN
params:
- key: pin
value: '{{ input.login_pin }}'
result:
success: 0
# 获取公钥
getCert:
description: "{{ 'Get certificate' | trans }}"
method:
call: UKey_GetCert
params:
- key: CERT_TYPE_SIGN
value: 1
# 签名数据
signData:
description: "{{ 'Sign data using the private key in USB Key' | trans }}"
method:
call: UKey_SignData
params:
- key: SM3
value: 1
- key: challenge_code
value: '{{ output.challenge_code }}'
getCertInfo:
description: "{{ 'Get certificate list (including certificate information)' | trans }}"
method:
call: UKey_GetCertInfoList
fields:
- ID:
key: ID
label: "{{ 'Certificate ID' | trans }}"
- CN:
key: CN
label: "{{ 'CN (ID)' | trans }}"
- DN:
key: DN
label: "{{ 'DN' | trans }}"
- SN:
key: SN
label: "{{ 'SN' | trans }}"
- bef:
key: bef
label: "{{ 'bef' | trans }}"
- aft:
key: aft
label: "{{ 'aft' | trans }}"
- isSM:
key: isSM
label: "{{ 'SM2 certificate' | trans }}"
- DEVSN:
key: DEVSN
label: "{{ 'Device serial number' | trans }}"
showFields: [ID, CN, DN, SN, bef, aft, isSM, DEVSN]
# 制证流程Enrollment
# 步骤顺序1 → 2 → 3 → 4服务端签发→ 5
enrollSteps:
# 制证前清除旧证书,避免容器满
- deleteCert:
description: "{{ 'Delete existing certificate' | trans }}"
method:
call: UKey_DeleteCons
# 在 USB Key 硬件内生成密钥对(私钥永不离开设备)
- genKeyPair:
description: "{{ 'Generate key pair' | trans }}"
method:
call: UKey_GenKeyPair
params:
- key: asymAlg
label: "{{ 'Asymmetric algorithm' | trans }}"
type: int
value: 1 # SM2
# 用 USB Key 内的私钥生成 CSR包含公钥 + Subject 信息)
- genCSR:
description: "{{ 'Generate certificate signing request (CSR)' | trans }}"
method:
call: UKey_GenCSR
params:
- key: dn
label: "{{ 'Certificate subject (Subject DN)' | trans }}"
type: string
items:
- key: CN
label: "{{ 'Name (CN)' | trans }}"
value: '{{ user.username }}' # 从配置读取 Subject CN 字段
- key: OU
label: "{{ 'Other information' | trans }}"
value: '{{ settings.VENDOR }}' # 从配置读取 Subject O 字段
- signCert:
description: "{{ 'Certificate issuance' | trans }}"
# 服务端 CA 签发证书后,将证书写入 USB Key
- writeCert:
description: "{{ 'Write the certificate' | trans }}"
method:
call: UKey_ImportCertAndKeyPair
params:
- key: SignCert
type: string
value: '{{ output.signed_cert }}' # 从服务端签发结果读取签名证书
- key: EncCert
value: ''
- key: EncKeyPair
value: ''
i18n:
Version:
en: Version
zh-hans: 版本
zh-hant: 版本
Get USB Key driver version:
en: Get USB Key driver version
zh-hans: 获取 USB Key 驱动版本
zh-hant: 取得 USB Key 驅動版本
Device serial number:
en: Device serial number
zh-hans: 设备序列号
zh-hant: 裝置序號
Get device serial number:
en: Get device serial number
zh-hans: 获取设备序列号
zh-hant: 取得裝置序號
Certificate enrollment:
en: Certificate enrollment
zh-hans: 制证功能
zh-hant: 制證功能
Enabled:
en: Enabled
zh-hans: 启用
zh-hant: 啟用
Whether certificate enrollment is enabled:
en: Whether certificate enrollment is enabled
zh-hans: 是否启用制证功能
zh-hant: 是否啟用制證功能
Enrollment algorithm:
en: Enrollment algorithm
zh-hans: 制证算法
zh-hant: 制證演算法
Key algorithm used for enrollment:
en: Key algorithm used for enrollment
zh-hans: 制证使用的密钥算法
zh-hant: 制證使用的金鑰演算法
Certificate validity:
en: Certificate validity
zh-hans: 证书期限
zh-hant: 證書有效期
365 days:
en: 365 days
zh-hans: 365
zh-hant: 365
Certificate validity period used for enrollment:
en: Certificate validity period used for enrollment
zh-hans: 制证使用的证书有效期
zh-hant: 制證使用的證書有效期
Initialize USB Key SDK and get SDK instance:
en: Initialize USB Key SDK and get SDK instance
zh-hans: 初始化 USB Key SDK获取 SDK 对象实例
zh-hant: 初始化 USB Key SDK 並取得 SDK 物件實例
Check if USB Key driver is installed:
en: Check if USB Key driver is installed
zh-hans: 检查 USB Key 驱动是否安装
zh-hant: 檢查 USB Key 驅動是否已安裝
Get built-in username from USB Key:
en: Get built-in username from USB Key
zh-hans: 获取 USB Key 内置用户名
zh-hant: 取得 USB Key 內建使用者名稱
Reset PIN:
en: Reset PIN
zh-hans: 重置 PIN
zh-hant: 重置 PIN
User change PIN:
en: User change PIN
zh-hans: 用户修改 PIN
zh-hant: 使用者修改 PIN
Delete existing certificate:
en: Delete existing certificate
zh-hans: 删除已有证书
zh-hant: 刪除已有證書
Verify PIN:
en: Verify PIN
zh-hans: 校验 PIN 是否正确
zh-hant: 驗證 PIN 是否正確
Get certificate:
en: Get certificate
zh-hans: 获取证书
zh-hant: 取得證書
Sign data using the private key in USB Key:
en: Sign data using the private key in USB Key
zh-hans: 使用 USB Key 内的私钥对数据进行签名
zh-hant: 使用 USB Key 內的私鑰對資料進行簽名
Get certificate list (including certificate information):
en: Get certificate list (including certificate information)
zh-hans: 获取证书列表(包含证书信息)
zh-hant: 取得證書清單(包含證書資訊)
Certificate ID:
en: Certificate ID
zh-hans: 证书 ID
zh-hant: 證書 ID
SM2 certificate:
en: SM2 certificate
zh-hans: 国密证书
zh-hant: 國密證書
Generate key pair:
en: Generate key pair
zh-hans: 生成密钥对
zh-hant: 產生金鑰對
Asymmetric algorithm:
en: Asymmetric algorithm
zh-hans: 非对称算法
zh-hant: 非對稱演算法
Generate certificate signing request (CSR):
en: Generate certificate signing request (CSR)
zh-hans: 生成证书签名请求 (CSR)
zh-hant: 產生憑證簽名請求 (CSR)
Certificate subject (Subject DN):
en: Certificate subject (Subject DN)
zh-hans: 证书主题 (Subject DN)
zh-hant: 憑證主題 (Subject DN)
Name (CN):
en: Name (CN)
zh-hans: 名称 (CN)
zh-hant: 名稱 (CN)
Other information:
en: Other information
zh-hans: 其他信息
zh-hant: 其他資訊
Certificate issuance:
en: Certificate issuance
zh-hans: 证书签发
zh-hant: 證書簽發
Write the certificate:
en: Write the certificate
zh-hans: 写入证书
zh-hant: 寫入證書

View File

@@ -0,0 +1,18 @@
from django import forms
from django.utils.translation import gettext_lazy as _
class CertLoginForm(forms.Form):
username = forms.CharField(
label=_('Username'), max_length=100, required=True,
widget=forms.HiddenInput(),
)
cert = forms.CharField(
required=True,
widget=forms.HiddenInput(),
)
signature = forms.CharField(
required=True,
widget=forms.HiddenInput(),
)

View File

@@ -0,0 +1,7 @@
from . import views
from django.urls import path
urlpatterns = [
path('cert/login/', views.CertLoginView.as_view(), name='cert-login')
]

View File

@@ -0,0 +1,129 @@
# -*- coding: utf-8 -*-
#
import secrets
from django.conf import settings
from django.contrib.auth import authenticate, login as auth_login
from django.core.cache import cache
from django.utils.decorators import method_decorator
from django.utils.translation import gettext as _
from django.views.decorators.cache import never_cache
from django.views.decorators.csrf import csrf_protect
from django.views.decorators.debug import sensitive_post_parameters
from django.views.generic.edit import FormView
from django.shortcuts import redirect
from django.http.response import HttpResponseRedirect
from common.utils import reverse, safe_next_url
from users.utils import redirect_user_first_login_or_index
from authentication.mixins import AuthMixin
from authentication.errors import ACLError
from authentication.errors import (
AuthFailedError, LoginConfirmBaseError, NeedRedirectError
)
from .forms import CertLoginForm
from users.utils import LoginBlockUtil, LoginIpBlockUtil
__all__ = ['CertLoginView']
_CHALLENGE_CACHE_KEY_PREFIX = 'cert_login_challenge'
NEXT_URL = 'next'
@method_decorator(sensitive_post_parameters(), name='dispatch')
@method_decorator(csrf_protect, name='dispatch')
@method_decorator(never_cache, name='dispatch')
class CertLoginView(AuthMixin, FormView):
template_name = 'authentication/cert_login.html'
form_class = CertLoginForm
redirect_field_name = 'next'
# ------------------------------------------------------------------
# Challenge helpers
# ------------------------------------------------------------------
def _ensure_session(self):
if not self.request.session.session_key:
self.request.session.create()
def _challenge_cache_key(self):
self._ensure_session()
return f'{_CHALLENGE_CACHE_KEY_PREFIX}_{self.request.session.session_key}'
def _generate_and_store_challenge(self):
challenge = secrets.token_hex(16)
ttl = getattr(settings, 'AUTH_CERT_CHALLENGE_TTL', 300)
cache.set(self._challenge_cache_key(), challenge, ttl)
return challenge
def _get_stored_challenge(self):
return cache.get(self._challenge_cache_key(), '')
def _delete_stored_challenge(self):
cache.delete(self._challenge_cache_key())
# ------------------------------------------------------------------
# Views
# ------------------------------------------------------------------
def get(self, request, *args, **kwargs):
challenge = self._generate_and_store_challenge()
context = self.get_context_data(form=self.get_form(), challenge=challenge)
return self.render_to_response(context)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
if 'challenge' not in context:
context['challenge'] = self._get_stored_challenge()
return context
def form_valid(self, form):
username = form.cleaned_data['username']
cert = form.cleaned_data['cert']
signature = form.cleaned_data['signature']
challenge = self._get_stored_challenge()
error_msg = None
ip = self.get_request_ip()
try:
self._check_is_block(username, True)
self._check_only_allow_exists_user_auth(username)
user = authenticate(
self.request, username=username, cert=cert, signature=signature, challenge=challenge
)
if user is None:
error_msg = _('Invalid credentials')
return self.get_failed_response(form, username, error_msg)
username = user.username
self._check_login_acl(user, ip)
LoginIpBlockUtil(ip).clean_block_if_need()
LoginBlockUtil(username, ip).clean_failed_count()
except AuthFailedError as e:
error_msg = e.msg
except NeedRedirectError as e:
return redirect(e.url)
except Exception as e:
error_msg = str(e)
finally:
self._delete_stored_challenge()
if error_msg:
return self.get_failed_response(form, username, error_msg)
else:
return self.get_success_response(self.request, user)
def get_failed_response(self, form, username, error_msg):
form.add_error(None, error_msg)
# Refresh the challenge so it cannot be replayed
challenge = self._generate_and_store_challenge()
context = self.get_context_data(form=form, challenge=challenge)
self.send_auth_signal(success=False, reason=error_msg, username=username)
return self.render_to_response(context)
def get_success_response(self, request, user):
self.mark_cert_ok(user, auth_backend=settings.AUTH_BACKEND_CERT)
return self.redirect_to_guard_view()

View File

@@ -245,6 +245,12 @@ class CommonMixin:
return user
user_id = self.request.session.get('user_id')
auth_cert_ok = self.request.session.get('auth_cert')
if auth_cert_ok:
user = get_object_or_404(User, pk=user_id)
user.backend = self.request.session.get("auth_backend")
return user
auth_ok = self.request.session.get('auth_password')
auth_expired_at = self.request.session.get('auth_password_expired_at')
auth_expired = auth_expired_at < time.time() if auth_expired_at else False
@@ -669,7 +675,7 @@ class AuthMixin(CommonMixin, AuthPreCheckMixin, AuthACLMixin, AuthFaceMixin, MFA
LoginBlockUtil(user.username, ip).clean_failed_count()
LoginIpBlockUtil(ip).clean_block_if_need()
return user
def mark_password_ok(self, user, auto_login=False, auth_backend=None):
request = self.request
request.session['auth_password'] = 1
@@ -681,6 +687,12 @@ class AuthMixin(CommonMixin, AuthPreCheckMixin, AuthACLMixin, AuthFaceMixin, MFA
request.session['auth_backend'] = auth_backend
def mark_cert_ok(self, user, auth_backend):
request = self.request
request.session['auth_cert'] = 1
request.session['user_id'] = str(user.id)
request.session['auth_backend'] = auth_backend
def check_oauth2_auth(self, user: User, auth_backend):
ip = self.get_request_ip()
request = self.request
@@ -713,7 +725,8 @@ class AuthMixin(CommonMixin, AuthPreCheckMixin, AuthACLMixin, AuthFaceMixin, MFA
keys = [
'auth_password', 'user_id', 'auth_confirm_required',
'auth_notice_required', 'auth_ticket_id', 'auth_acl_id',
'user_session_id', 'user_log_id', 'can_send_notifications'
'user_session_id', 'user_log_id', 'can_send_notifications',
'auth_cert'
]
for k in keys:
self.request.session.pop(k, '')

View File

@@ -0,0 +1,492 @@
{% load i18n %}
{% load bootstrap3 %}
{% load static %}
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<link rel="shortcut icon" href="{{ INTERFACE.favicon }}" type="image/x-icon">
<title>{{ INTERFACE.login_title }}</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
{% include '_head_css_js.html' %}
<link href="{% static 'css/login-style.css' %}" rel="stylesheet">
<link href="{% static 'css/jumpserver.css' %}" rel="stylesheet">
<style>
body {
box-sizing: border-box;
display: flex;
align-items: center;
justify-content: center;
width: 100vw;
height: 100vh;
background-color: #f3f3f3;
}
.cert-login-box {
background-color: #fff;
border-radius: 6px;
box-shadow: 0 2px 12px rgba(0, 0, 0, 0.1);
padding: 40px 50px 32px;
width: 420px;
}
.cert-login-box .logo-row {
text-align: center;
margin-bottom: 16px;
}
.cert-login-box .logo-row img {
height: 48px;
width: 48px;
}
.cert-login-box h3 {
text-align: center;
font-weight: 400;
font-size: 20px;
color: #151515;
margin-top: 0;
margin-bottom: 24px;
}
/* USB Key status bar */
.usb-status-bar {
display: flex;
align-items: center;
gap: 8px;
padding: 8px 12px;
border-radius: 4px;
font-size: 13px;
margin-bottom: 20px;
border: 1px solid transparent;
}
.usb-status-bar .usb-dot {
width: 10px;
height: 10px;
border-radius: 50%;
flex-shrink: 0;
}
.usb-status-loading {
background-color: #f5f5f5;
border-color: #ddd;
color: #888;
}
.usb-status-loading .usb-dot {
background-color: #bbb;
animation: usb-blink 1.2s infinite;
}
.usb-status-inserted {
background-color: #f0faf0;
border-color: #b7deb7;
color: #3a7d3a;
}
.usb-status-inserted .usb-dot {
background-color: #52c41a;
}
.usb-status-not-inserted {
background-color: #fffbe6;
border-color: #ffe58f;
color: #ad8b00;
}
.usb-status-not-inserted .usb-dot {
background-color: #faad14;
}
.usb-status-error {
background-color: #fff2f0;
border-color: #ffccc7;
color: #cf1322;
}
.usb-status-error .usb-dot {
background-color: #ff4d4f;
}
@keyframes usb-blink {
0%, 100% { opacity: 1; }
50% { opacity: 0.3; }
}
.help-block {
margin: 0;
text-align: left;
}
form label {
color: #737373;
font-size: 13px;
font-weight: normal;
}
.form-group {
margin-bottom: 20px;
margin-top: 0;
}
.red-fonts {
color: red;
}
.form-group.has-error {
margin-bottom: 0;
}
.challenge-box {
margin-bottom: 20px;
padding: 10px 12px;
background-color: #f7f7f7;
border: 1px solid #e3e3e3;
border-radius: 4px;
}
.challenge-label {
font-size: 11px;
color: #aaa;
margin-bottom: 4px;
}
.challenge-value {
font-family: monospace;
font-size: 13px;
color: #555;
word-break: break-all;
}
.btn-transparent {
width: 100%;
}
.btn-transparent:disabled {
opacity: 0.5;
cursor: not-allowed;
pointer-events: none;
}
.other-login {
text-align: center;
margin-top: 16px;
font-size: 12px;
}
.other-login a {
color: #999;
}
</style>
</head>
<body>
<div class="cert-login-box">
<div class="logo-row">
<img src="{{ INTERFACE.logo_logout }}" alt="Logo"/>
</div>
<h3>{% trans "Certificate Authentication" %}</h3>
<!-- USB Key status indicator -->
<div id="usb-status-bar" class="usb-status-bar usb-status-loading">
<span class="usb-dot"></span>
<span id="usb-status-text">{% trans "Loading USB Key driver..." %}</span>
</div>
<form id="cert-login-form" autocomplete="off" action="" method="post" role="form" novalidate="novalidate">
{% csrf_token %}
{% if form.non_field_errors %}
<div style="margin-bottom: 16px;">
<p class="help-block red-fonts">{{ form.non_field_errors.as_text }}</p>
</div>
{% endif %}
{# username is auto-filled from USB Key cert, display input is readonly, actual value submitted via hidden field #}
<div class="form-group{% if form.username.errors %} has-error{% endif %}">
<input type="text" id="username-display" class="form-control"
placeholder="{% trans 'Insert USB Key to auto-fetch' %}"
readonly tabindex="-1"
style="background-color:#f7f7f7; cursor:default; color:#555;">
{{ form.username }}
{% if form.username.errors %}
<p class="help-block red-fonts">{{ form.username.errors.as_text }}</p>
{% endif %}
</div>
{# PIN is only used for client-side USB Key verification, not submitted to backend #}
<div class="form-group">
<input type="password" id="id_pin" class="form-control"
placeholder="PIN" autocomplete="off">
</div>
{# cert is fetched and filled by JS after PIN verification, not shown to user #}
{{ form.cert }}
{# signature is filled by JS after signing challenge code, not shown to user #}
{{ form.signature }}
<div id="cert-challenge-box" data-challenge="{{ challenge }}" style="display:none;"></div>
<div class="form-group">
<button type="submit" id="cert-login-btn" class="btn btn-transparent" disabled>
{% trans "Login" %}
</button>
</div>
</form>
<div class="other-login">
<a href="{% url 'authentication:login' %}">{% trans "More login methods" %}</a>
</div>
</div>
</body>
{% include '_foot_js.html' %}
<script>
(function () {
'use strict';
/* i18n strings (rendered server-side) */
var MSG = {
loading: '{% trans "Loading USB Key driver..." %}',
detecting: '{% trans "Detecting USB Key..." %}',
inserted: '{% trans "USB Key connected" %}',
notInserted: '{% trans "Please insert USB Key" %}',
driverError: '{% trans "Driver unavailable" %}',
initError: '{% trans "USB Key SDK initialization failed" %}',
pinChecking: '{% trans "Verifying PIN..." %}',
pinOk: '{% trans "PIN verified" %}',
pinFail: '{% trans "PIN verification failed" %}',
signing: '{% trans "Signing challenge code..." %}',
signFail: '{% trans "Signing failed" %}',
certFail: '{% trans "Failed to retrieve certificate" %}',
noCN: '{% trans "No certificate detected, please contact administrator" %}'
};
/* API URLs */
var API_JS = '/api/v1/authentication/cert/vendor-driver.js/';
var API_CONFIG = '/api/v1/authentication/cert/vendor-driver-config/';
/* DOM refs */
var statusBar = document.getElementById('usb-status-bar');
var statusText = document.getElementById('usb-status-text');
var loginBtn = document.getElementById('cert-login-btn');
/* Status helper */
function setStatus(type, message) {
statusBar.className = 'usb-status-bar usb-status-' + type;
statusText.textContent = message;
loginBtn.disabled = (type !== 'inserted');
}
/* Inject vendor JS into the page */
function injectScript(jsText) {
var s = document.createElement('script');
s.textContent = jsText;
document.head.appendChild(s);
}
/* USB Key manager built from config */
function UsbKeyManager(config) {
this.config = config;
this.api = null;
var ctorName = (config.newUKeyAPI || {}).method && config.newUKeyAPI.method.call;
if (!ctorName || typeof window[ctorName] !== 'function') {
throw new Error(MSG.initError + ': ' + ctorName);
}
this.api = new window[ctorName]();
}
/* Resolve a single param value: replace {{ scope.key }} placeholders from context.
context shape: { output: {}, input: {}, user: {}, ... }
Unresolved placeholders are replaced with empty string. */
function resolveParamValue(value, context) {
if (typeof value !== 'string') return value;
return value.replace(/\{\{\s*([\w.]+)\s*\}\}/g, function (_, path) {
var parts = path.split('.');
var obj = context;
for (var i = 0; i < parts.length; i++) {
if (obj == null) return '';
obj = obj[parts[i]];
}
return (obj !== undefined && obj !== null) ? String(obj) : '';
});
}
/* Call a vendor method by its abstract config key, returns a Promise.
Params are read from config[key].method.params in order.
Template placeholders ({{ scope.key }}) in param values are resolved
from context: { output, input, user, ... } */
UsbKeyManager.prototype.call = function (abstractKey, context) {
var cfg = this.config[abstractKey];
var method = cfg && cfg.method && cfg.method.call;
if (!method || typeof this.api[method] !== 'function') {
return Promise.reject(new Error('Method not found: ' + method));
}
var ctx = context || {};
var args = ((cfg.method.params) || []).map(function (p) {
var raw = p['value'] !== undefined ? p['value'] : p['default'];
return resolveParamValue(raw, ctx);
});
try {
return Promise.resolve(this.api[method].apply(this.api, args));
} catch (e) {
return Promise.reject(e);
}
};
/* Get CN string from the USB Key via config's getCertCN method. */
UsbKeyManager.prototype.getCertCN = function () {
return this.call('getCertCN').then(function (result) {
return result || null;
}).catch(function () { return null; });
};
/* Verify the PIN entered by the user.
Success is determined by config.checkPIN.method.result.success if defined,
otherwise falls back to treating 0 / true as success.
Returns a Promise<boolean>. */
UsbKeyManager.prototype.checkPIN = function (pinValue) {
var self = this;
return this.call('checkPIN', { input: { login_pin: pinValue } }).then(function (result) {
var resultCfg = ((self.config.checkPIN || {}).method || {}).result;
if (resultCfg && 'success' in resultCfg) {
return result === resultCfg.success;
}
return result === 0 || result === true || result === 'true';
}).catch(function () { return false; });
};
/* Get certificate string from the USB Key via config's getCert method.
Returns a Promise<string|null>. */
UsbKeyManager.prototype.getCert = function () {
return this.call('getCert', {}).then(function (result) {
return result || null;
}).catch(function () { return null; });
};
/* Sign the challenge code with the USB Key's private key.
The challenge is base64-encoded before being passed to the SDK.
Returns a Promise<string|null>. */
UsbKeyManager.prototype.signData = function (challengeCode) {
var encoded = btoa(challengeCode);
return this.call('signData', { output: { challenge_code: encoded } }).then(function (result) {
return result || null;
}).catch(function () { return null; });
};
/* Check whether a USB Key is currently inserted */
UsbKeyManager.prototype.isInserted = function () {
var self = this;
// checkInstall returns 0
return self.call('checkInstall').then(function (result) {
return result === 0 ? true : !!result;
}).catch(function () {
return false;
});
};
/* Main init */
document.addEventListener('DOMContentLoaded', function () {
setStatus('loading', MSG.loading);
// 1. Fetch config + vendor JS in parallel
Promise.all([
fetch(API_CONFIG).then(function (r) {
if (!r.ok) throw new Error('config ' + r.status);
return r.json();
}),
fetch(API_JS).then(function (r) {
if (!r.ok) throw new Error('driver js ' + r.status);
return r.text();
})
]).then(function (results) {
var config = results[0];
var jsText = results[1];
// 2. Inject vendor JS
injectScript(jsText);
// 3. Instantiate USBAPI from config
var manager = new UsbKeyManager(config);
// 4. Poll USB Key status
setStatus('loading', MSG.detecting);
var lastCN = null;
var usernameDisplay = document.getElementById('username-display');
var usernameHidden = document.getElementById('id_username');
function setUsername(cn) {
if (cn === lastCN) return; // unchanged, skip DOM update
lastCN = cn;
usernameDisplay.value = cn || '';
usernameHidden.value = cn || '';
}
function poll() {
manager.isInserted().then(function (inserted) {
if (inserted) {
manager.getCertCN().then(function (cn) {
setUsername(cn);
if (cn) {
setStatus('inserted', MSG.inserted);
} else {
setStatus('error', MSG.noCN);
}
});
} else {
setUsername(null);
setStatus('not-inserted', MSG.notInserted);
}
});
}
poll();
setInterval(poll, 5000);
// 5. Intercept form submit: PIN signData getCert submit
document.getElementById('cert-login-form').addEventListener('submit', function (evt) {
evt.preventDefault();
var pinInput = document.getElementById('id_pin');
var pinValue = pinInput ? pinInput.value : '';
var challengeCode = document.getElementById('cert-challenge-box').dataset.challenge || '';
setStatus('loading', MSG.pinChecking);
manager.checkPIN(pinValue)
.then(function (ok) {
if (!ok) {
if (pinInput) { pinInput.value = ''; pinInput.focus(); }
throw { msg: MSG.pinFail };
}
setStatus('loading', MSG.signing);
return manager.signData(challengeCode);
})
.then(function (signature) {
if (!signature) throw { msg: MSG.signFail };
return manager.getCert().then(function (cert) {
return { signature: signature, cert: cert };
});
})
.then(function (data) {
if (!data.cert) throw { msg: MSG.certFail };
document.getElementById('id_signature').value = data.signature;
document.getElementById('id_cert').value = data.cert;
setStatus('inserted', MSG.pinOk);
document.getElementById('cert-login-form').submit();
})
.catch(function (err) {
setStatus('error', err.msg || MSG.pinFail);
loginBtn.disabled = false;
});
});
}).catch(function (err) {
console.error('[CertLogin] init error:', err);
setStatus('error', MSG.driverError + ': ' + err.message);
});
});
}());
</script>
</html>

View File

@@ -6,6 +6,7 @@ from rest_framework.routers import DefaultRouter
from .. import api
from ..backends.passkey.urls import urlpatterns as passkey_urlpatterns
from ..backends.cert.api_urls import urlpatterns as cert_api_urlpatterns
app_name = 'authentication'
router = DefaultRouter()
@@ -53,6 +54,7 @@ if settings.AUTH_CUSTOM_SSO:
path('custom-sso/login/', api.CustomSSOLoginAPIView.as_view(), name='custom-sso-login'),
]
if settings.AUTH_CERT:
urlpatterns += cert_api_urlpatterns
urlpatterns += router.urls + passkey_urlpatterns

View File

@@ -4,6 +4,7 @@
from django.db.transaction import non_atomic_requests
from django.urls import path, include
from users import views as users_view
from .. import views
@@ -81,10 +82,12 @@ urlpatterns = [
path('openid/', include(('authentication.backends.oidc.urls', 'authentication'), namespace='openid')),
path('saml2/', include(('authentication.backends.saml2.urls', 'authentication'), namespace='saml2')),
path('oauth2/', include(('authentication.backends.oauth2.urls', 'authentication'), namespace='oauth2')),
path('cert/', include(('authentication.backends.cert.view_urls', 'authentication'), namespace='cert')),
path('captcha/', include('captcha.urls')),
path('oauth2-provider/', include(('authentication.backends.oauth2_provider.urls', 'authentication'), namespace='oauth2-provider')),
path('user-agreement/', views.UserAgreementView.as_view(), name='user-agreement'),
path('privacy-policy/', views.PrivacyPolicyView.as_view(), name='privacy-policy'),
]

View File

@@ -145,5 +145,11 @@ def get_auth_methods():
'enabled': settings.AUTH_PASSKEY,
'url': reverse('api-auth:passkey-login'),
'logo': static('img/login_passkey.png')
},
{
'name': _('CERT'),
'enabled': settings.AUTH_CERT,
'url': reverse('authentication:cert:cert-login'),
'logo': static('img/login_cert.png')
}
]

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1693,5 +1693,44 @@
"ImageFileCorruptedOrUnreadable": "The image file is corrupted or unreadable, please check the file and try again",
"DeviceManager": "Device manager",
"RefreshSecret": "Refresh secret",
"RefreshSuccessMsg": "Refresh successful"
"RefreshSuccessMsg": "Refresh successful",
"CertDeviceDriverStatus": "Device & Driver Status",
"CertActions": "Actions",
"CertOperationLogs": "Operation Logs",
"CertInfo": "Certificate Info",
"CertNoCertIssued": "No Certificate Issued",
"CertChangePIN": "Change PIN",
"CertOldPIN": "Old PIN",
"CertEnterOldPIN": "Please enter old PIN",
"CertNewPIN": "New PIN",
"CertEnterNewPIN": "Please enter new PIN",
"CertConfirmPIN": "Confirm PIN",
"CertReEnterNewPIN": "Please re-enter new PIN",
"CertIssueCert": "Issue Certificate",
"CertIssueCertHint": "Detect device, initialize USB Key, sign and write certificate",
"CertDeleteCert": "Delete Certificate",
"CertDeleteCertHint": "Delete certificate stored on USB Key",
"CertDeleteCertConfirmMsg": "Are you sure to delete the certificate on the USB Key? This operation cannot be undone.",
"CertResetPIN": "Reset PIN",
"CertResetPINHint": "Use admin PIN to reset the USB Key user PIN to its default value",
"CertEnterAdminPIN": "Please enter admin PIN",
"CertAdminPIN": "Admin PIN",
"CertChangePINHint": "Change the USB Key user PIN",
"CertFillAllFields": "Please fill in all fields",
"CertPINMismatch": "The two new PINs do not match",
"CertChangePINFailed": "Change PIN failed, please try again",
"CertDriverStatus": "Driver Status",
"CertLoadFailed": "Load Failed",
"CertLoaded": "Loaded",
"CertLoading": "Loading",
"CertDriverConfig": "Driver Config",
"CertNotLoaded": "Not Loaded",
"CertDeviceInserted": "Inserted",
"CertDeviceNotInserted": "Not Inserted",
"CertOwner": "Certificate Owner",
"CertOwnerSelf": "Current User",
"CertOwnerOther": "Other User",
"CertOwnerUnknown": "Unknown",
"UserCertificate": "User certificate",
"LoginCertificate": "Login certificate"
}

View File

@@ -1704,5 +1704,44 @@
"FooterContentTooLong200": "页脚内容不能超过 200 个字符",
"ImageFileCorruptedOrUnreadable": "图像文件已损坏或无法读取,请检查文件并重试。",
"RefreshSecret": "刷新密钥",
"RefreshSuccessMsg": "刷新成功"
"RefreshSuccessMsg": "刷新成功",
"CertDeviceDriverStatus": "设备与驱动状态",
"CertActions": "操作",
"CertOperationLogs": "操作日志",
"CertInfo": "证书信息",
"CertNoCertIssued": "暂未制证",
"CertChangePIN": "修改 PIN",
"CertOldPIN": "旧 PIN",
"CertEnterOldPIN": "请输入旧 PIN",
"CertNewPIN": "新 PIN",
"CertEnterNewPIN": "请输入新 PIN",
"CertConfirmPIN": "确认 PIN",
"CertReEnterNewPIN": "再次输入新 PIN",
"CertIssueCert": "一键制证",
"CertIssueCertHint": "检测设备、初始化 USB Key、签发并写入证书",
"CertDeleteCert": "清除证书",
"CertDeleteCertHint": "清除 USB Key 中已存储的证书",
"CertDeleteCertConfirmMsg": "确认要清除 USB Key 中的证书吗?此操作不可恢复。",
"CertResetPIN": "重置 PIN",
"CertResetPINHint": "使用管理员 PIN 将 USB Key 的用户 PIN 重置为默认值",
"CertEnterAdminPIN": "请输入管理员 PIN",
"CertAdminPIN": "管理员 PIN",
"CertChangePINHint": "修改 USB Key 的用户 PIN 码",
"CertFillAllFields": "请填写所有字段",
"CertPINMismatch": "两次输入的新 PIN 不一致",
"CertChangePINFailed": "修改失败,请重试",
"CertDriverStatus": "驱动状态",
"CertLoadFailed": "加载失败",
"CertLoaded": "已加载",
"CertLoading": "加载中",
"CertDriverConfig": "驱动配置",
"CertNotLoaded": "未加载",
"CertDeviceInserted": "已插入",
"CertDeviceNotInserted": "未插入",
"CertOwner": "证书归属",
"CertOwnerSelf": "当前用户",
"CertOwnerOther": "非当前用户",
"CertOwnerUnknown": "未知",
"UserCertificate": "用户证书",
"LoginCertificate": "登录证书"
}

View File

@@ -1702,5 +1702,44 @@
"ImageFileCorruptedOrUnreadable": "影像檔案已損壞或無法讀取,請檢查檔案並重試。",
"DeviceManager": "設備管理",
"RefreshSecret": "刷新金鑰",
"RefreshSuccessMsg": "刷新成功"
"RefreshSuccessMsg": "刷新成功",
"CertDeviceDriverStatus": "設備與驅動狀態",
"CertActions": "操作",
"CertOperationLogs": "操作日誌",
"CertInfo": "憑證資訊",
"CertNoCertIssued": "暫未製證",
"CertChangePIN": "修改 PIN",
"CertOldPIN": "舊 PIN",
"CertEnterOldPIN": "請輸入舊 PIN",
"CertNewPIN": "新 PIN",
"CertEnterNewPIN": "請輸入新 PIN",
"CertConfirmPIN": "確認 PIN",
"CertReEnterNewPIN": "再次輸入新 PIN",
"CertIssueCert": "一鍵製證",
"CertIssueCertHint": "偵測設備、初始化 USB Key、簽發並寫入憑證",
"CertDeleteCert": "清除憑證",
"CertDeleteCertHint": "清除 USB Key 中已存儲的憑證",
"CertDeleteCertConfirmMsg": "確認要清除 USB Key 中的憑證嗎?此操作不可恢復。",
"CertResetPIN": "重置 PIN",
"CertResetPINHint": "使用管理員 PIN 將 USB Key 的使用者 PIN 重置為預設值",
"CertEnterAdminPIN": "請輸入管理員 PIN",
"CertAdminPIN": "管理員 PIN",
"CertChangePINHint": "修改 USB Key 的使用者 PIN 碼",
"CertFillAllFields": "請填寫所有欄位",
"CertPINMismatch": "兩次輸入的新 PIN 不一致",
"CertChangePINFailed": "修改失敗,請重試",
"CertDriverStatus": "驅動狀態",
"CertLoadFailed": "載入失敗",
"CertLoaded": "已載入",
"CertLoading": "載入中",
"CertDriverConfig": "驅動配置",
"CertNotLoaded": "未載入",
"CertDeviceInserted": "已插入",
"CertDeviceNotInserted": "未插入",
"CertOwner": "憑證歸屬",
"CertOwnerSelf": "當前使用者",
"CertOwnerOther": "非當前使用者",
"CertOwnerUnknown": "未知",
"UserCertificate": "用戶憑證",
"LoginCertificate": "登入憑證"
}

View File

@@ -269,6 +269,9 @@ class Config(dict):
'AUTH_CUSTOM_SSO_FILE_MD5': '',
'AUTH_CUSTOM_SSO_QUERY_PARAMS': 'token',
'AUTH_CERT': False,
'CA_KEY_PASS': '',
# 临时密码
'AUTH_TEMP_TOKEN': False,

View File

@@ -5,6 +5,7 @@ import os
import ldap
from ..const import CONFIG, PROJECT_DIR, BASE_DIR
from . import exist_or_default
# OTP settings
OTP_ISSUER_NAME = CONFIG.OTP_ISSUER_NAME
@@ -290,6 +291,7 @@ AUTH_BACKEND_OAUTH2 = 'authentication.backends.oauth2.OAuth2Backend'
AUTH_BACKEND_TEMP_TOKEN = 'authentication.backends.token.TempTokenAuthBackend'
AUTH_BACKEND_CUSTOM = 'authentication.backends.custom.CustomAuthBackend'
AUTH_BACKEND_PASSKEY = 'authentication.backends.passkey.PasskeyAuthBackend'
AUTH_BACKEND_CERT = 'authentication.backends.cert.CertBackend'
AUTHENTICATION_BACKENDS = [
# 只做权限校验
RBAC_BACKEND,
@@ -302,7 +304,9 @@ AUTHENTICATION_BACKENDS = [
AUTH_BACKEND_WECOM, AUTH_BACKEND_DINGTALK, AUTH_BACKEND_FEISHU, AUTH_BACKEND_LARK, AUTH_BACKEND_SLACK,
# Token模式
AUTH_BACKEND_AUTH_TOKEN, AUTH_BACKEND_SSO, AUTH_BACKEND_CUSTOM_SSO, AUTH_BACKEND_TEMP_TOKEN,
AUTH_BACKEND_PASSKEY
AUTH_BACKEND_PASSKEY,
# Cert模式
AUTH_BACKEND_CERT
]
@@ -378,4 +382,26 @@ if AUTH_CUSTOM_SSO and AUTH_CUSTOM_SSO_FILE_MD5:
if md5 != get_file_md5(AUTH_CUSTOM_SSO_FILE_PATH):
# 如果启用了自定义 SSO 认证,但文件 MD5 不匹配,则不启用自定义 SSO 认证
AUTH_CUSTOM_SSO = False
AUTH_CUSTOM_SSO_QUERY_PARAMS = [q.strip() for q in CONFIG.AUTH_CUSTOM_SSO_QUERY_PARAMS.split(',')]
AUTH_CUSTOM_SSO_QUERY_PARAMS = [q.strip() for q in CONFIG.AUTH_CUSTOM_SSO_QUERY_PARAMS.split(',')]
# 开启证书认证
AUTH_CERT = CONFIG.AUTH_CERT
# 证书认证前端 SDK 文件路径
AUTH_CERT_VENDOR_DRIVER_JS_FILE = exist_or_default(
os.path.join(PROJECT_DIR, 'data', 'auth', 'cert_driver.js'), None
)
# 抽象方法名 → 厂商 SDK 实际方法名
AUTH_CERT_VENDOR_DRIVER_CONFIG_FILE = exist_or_default(
os.path.join(PROJECT_DIR, 'data', 'auth', 'cert_driver_config.yaml'), None
)
# CA 根证书私钥文件路径
CA_KEY_FILE = exist_or_default(
os.path.join(PROJECT_DIR, 'data', 'certs', 'ca_root.key'), None
)
# CA 根证书文件路径,可用于认证时验证客户端证书,签发用户证书
CA_CERT_FILE = exist_or_default(
os.path.join(PROJECT_DIR, 'data', 'certs', 'ca_root.crt'), None
)
# CA 私鑰密码(若私鑰带密码保护,不含密码则留空)
CA_KEY_PASS = CONFIG.CA_KEY_PASS

View File

@@ -53,6 +53,7 @@ class SettingsApi(generics.RetrieveUpdateAPIView):
'saml2': serializers.SAML2SettingSerializer,
'oauth2': serializers.OAuth2SettingSerializer,
'passkey': serializers.PasskeySettingSerializer,
'cert': serializers.CertSettingSerializer,
'clean': serializers.CleaningSerializer,
'other': serializers.OtherSettingSerializer,
'sms': serializers.SMSSettingSerializer,

View File

@@ -14,3 +14,4 @@ from .slack import *
from .sms import *
from .sso import *
from .wecom import *
from .cert import *

View File

@@ -24,6 +24,7 @@ class AuthSettingSerializer(serializers.Serializer):
AUTH_SLACK = serializers.BooleanField(default=False, label=_('WeCom Auth'))
AUTH_SSO = serializers.BooleanField(default=False, label=_("SSO Auth"))
AUTH_PASSKEY = serializers.BooleanField(default=False, label=_("Passkey Auth"))
AUTH_CERT = serializers.BooleanField(default=False, label=_("Certificate Auth"))
EMAIL_SUFFIX = serializers.CharField(
required=False, max_length=1024, label=_("Email suffix"),
help_text=_(

View File

@@ -0,0 +1,12 @@
from django.utils.translation import gettext_lazy as _
from rest_framework import serializers
__all__ = ['CertSettingSerializer']
class CertSettingSerializer(serializers.Serializer):
PREFIX_TITLE = _('Certificate')
AUTH_CERT = serializers.BooleanField(
default=False, label=_('Certificate')
)

View File

@@ -92,6 +92,8 @@ class PrivateSettingSerializer(PublicSettingSerializer):
REMOTE_APP_STORE_URL = serializers.CharField()
VENDOR = serializers.CharField()
AUTH_CERT = serializers.BooleanField()
class ServerInfoSerializer(serializers.Serializer):
CURRENT_TIME = serializers.DateTimeField()

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.4 KiB

View File

@@ -84,7 +84,7 @@ class Migration(migrations.Migration):
choices=[('local', 'Local'), ('ldap', 'LDAP/AD'), ('ldap_ha', 'LDAP/AD (HA)'), ('openid', 'OpenID'),
('radius', 'Radius'), ('cas', 'CAS'), ('saml2', 'SAML2'), ('oauth2', 'OAuth2'),
('wecom', 'WeCom'), ('dingtalk', 'DingTalk'), ('feishu', 'FeiShu'), ('lark', 'Lark'),
('slack', 'Slack'), ('custom', 'Custom')], default='local', max_length=30,
('slack', 'Slack'), ('custom', 'Custom'), ('cert', 'Certificate')], default='local', max_length=30,
verbose_name='Source')),
('wecom_id', models.CharField(default=None, max_length=128, null=True, verbose_name='WeCom')),
('dingtalk_id', models.CharField(default=None, max_length=128, null=True, verbose_name='DingTalk')),

View File

@@ -161,6 +161,10 @@ class AuthMixin:
@staticmethod
def can_use_ssh_key_login():
return settings.TERMINAL_PUBLIC_KEY_AUTH
@staticmethod
def can_use_cert_login():
return settings.AUTH_CERT
def is_history_password(self, password):
allow_history_password_count = settings.OLD_PASSWORD_HISTORY_LIMIT_COUNT

View File

@@ -24,6 +24,7 @@ class Source(models.TextChoices):
lark = "lark", _("Lark")
slack = "slack", _("Slack")
custom = "custom", "Custom"
cert = "cert", _("Certificate")
@classmethod
def as_dict(cls):
@@ -56,6 +57,7 @@ class SourceMixin:
Source.slack: [settings.AUTH_BACKEND_SLACK],
Source.dingtalk: [settings.AUTH_BACKEND_DINGTALK],
Source.custom: [settings.AUTH_BACKEND_CUSTOM],
Source.cert: [settings.AUTH_BACKEND_CERT],
}
@classmethod
@@ -74,6 +76,7 @@ class SourceMixin:
cls.Source.slack: settings.AUTH_SLACK,
cls.Source.dingtalk: settings.AUTH_DINGTALK,
cls.Source.custom: settings.AUTH_CUSTOM,
cls.Source.cert: settings.AUTH_CERT,
}
return [str(k) for k, v in mapper.items() if v]

View File

@@ -140,6 +140,11 @@ class UserSerializer(
label=_("Can public key authentication"),
read_only=True,
)
can_cert_auth = serializers.BooleanField(
source="can_use_cert_login",
label=_("Can certificate authentication"),
read_only=True,
)
is_face_code_set = serializers.BooleanField(
label=_("Is face code set"),
read_only=True,
@@ -191,7 +196,7 @@ class UserSerializer(
fields_bool = [
"is_superuser", "is_org_admin", "is_service_account",
"is_valid", "is_expired", "is_active", # 布尔字段
"is_otp_secret_key_bound", "can_public_key_auth",
"is_otp_secret_key_bound", "can_public_key_auth", "can_cert_auth",
"mfa_enabled", "need_update_password", "is_face_code_set",
]
# 包含不太常用的字段,可以没有