mirror of
https://github.com/jumpserver/jumpserver.git
synced 2026-07-03 07:39:39 +00:00
183 lines
7.3 KiB
Python
183 lines
7.3 KiB
Python
import os
|
||
import yaml
|
||
import json
|
||
from django.conf import settings
|
||
from common.utils import get_logger
|
||
from common.decorators import Singleton
|
||
from common.const import Language
|
||
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
class Setting:
|
||
VENDOR = getattr(settings, 'VENDOR', '')
|
||
|
||
|
||
@Singleton
|
||
class CertVendorDriverConfig:
|
||
"""
|
||
从 YAML 配置文件读取所有证书相关配置。
|
||
|
||
CA 相关路径/密码(ca_cert_file / ca_key_file / ca_key_pass)属于系统敏感配置,
|
||
只能在系统设置(config.yml / Django settings)中配置,不允许写入此 YAML。
|
||
|
||
YAML 结构约定:
|
||
cert: # 系统级配置段
|
||
gmssl_bin: gmssl # gmssl 二进制路径
|
||
challenge_ttl: 300 # Challenge 码 Redis 存活秒数
|
||
enroll:
|
||
enabled: true # 是否开启证书签发
|
||
key_algo: SM2 # 签发密钥算法:SM2 或 RSA
|
||
subject_cn: username # 用户证书 Subject CN 取自用户的哪个字段
|
||
subject_o: JumpServer # 用户证书 Subject O(组织名)
|
||
# 其余 key 为厂商 SDK 方法映射(供前端 API 层使用)
|
||
newUKeyAPI: ...
|
||
checkInstall: ...
|
||
getCertCN: ...
|
||
...
|
||
"""
|
||
|
||
def __init__(self):
|
||
if not settings.AUTH_CERT:
|
||
logger.debug('CertVendorDriverConfig: authentication backend not enabled, skipping config load')
|
||
return
|
||
config_file = getattr(settings, 'AUTH_CERT_VENDOR_DRIVER_CONFIG_FILE', None)
|
||
self._raw = self._load_yaml(config_file)
|
||
self._cert = self._raw.get('cert') or {}
|
||
self._enroll = self._cert.get('enroll') or {}
|
||
|
||
# ── YAML 加载 ────────────────────────────────────────────────────────────
|
||
|
||
@staticmethod
|
||
def _load_yaml(config_file):
|
||
if not config_file or not os.path.isfile(config_file):
|
||
logger.warning('CertVendorDriverConfig: config file not found: %s', config_file)
|
||
return {}
|
||
with open(config_file, 'r', encoding='utf-8') as f:
|
||
return yaml.safe_load(f) or {}
|
||
|
||
# ── CA / 证书链(只读系统设置,不允许在 YAML 中配置)────────────────────────
|
||
|
||
@property
|
||
def ca_cert_file(self):
|
||
"""CA 根证书路径,只从系统设置读取。"""
|
||
return getattr(settings, 'CA_CERT_FILE', None)
|
||
|
||
@property
|
||
def ca_key_file(self):
|
||
"""CA 私钥路径,只从系统设置读取。"""
|
||
return getattr(settings, 'CA_KEY_FILE', None)
|
||
|
||
@property
|
||
def ca_key_pass(self):
|
||
"""CA 私钥密码,只从系统设置读取。"""
|
||
return str(getattr(settings, 'CA_KEY_PASS', ''))
|
||
|
||
@property
|
||
def driver_js_file(self):
|
||
"""返回厂商 SDK 驱动文件的 FileResponse,供 API 层使用。"""
|
||
return getattr(settings, 'AUTH_CERT_VENDOR_DRIVER_JS_FILE', None)
|
||
|
||
# ── 工具 ─────────────────────────────────────────────────────────────────
|
||
|
||
@property
|
||
def gmssl_bin(self):
|
||
"""gmssl 二进制路径,默认 'gmssl'(系统 PATH 中查找)。"""
|
||
return 'gmssl'
|
||
|
||
# ── 认证流程 ──────────────────────────────────────────────────────────────
|
||
|
||
@property
|
||
def challenge_ttl(self):
|
||
"""Challenge 码在 Redis 中的存活时间(秒),默认 300。"""
|
||
v = self._cert.get('challenge_ttl', 300)
|
||
return int(v)
|
||
|
||
# ── 证书签发 ──────────────────────────────────────────────────────────────
|
||
|
||
@property
|
||
def enroll_enabled(self):
|
||
"""是否开启用户证书签发功能。"""
|
||
v = self._enroll.get('enabled', True)
|
||
return bool(v)
|
||
|
||
@property
|
||
def enroll_key_algo(self):
|
||
"""签发证书时生成密钥对的算法,SM2 或 RSA。"""
|
||
return self._enroll.get('key_algo', 'SM2')
|
||
|
||
@property
|
||
def enroll_subject_cn(self):
|
||
"""用户证书 Subject CN 取自用户模型的哪个字段,默认 'username'。"""
|
||
return self._enroll.get('subject_cn', 'username')
|
||
|
||
@property
|
||
def enroll_subject_o(self):
|
||
"""用户证书 Subject O(组织名)。"""
|
||
return self._enroll.get('subject_o', Setting.VENDOR)
|
||
|
||
@property
|
||
def enroll_validity_days(self):
|
||
"""签发证书的有效期(天),默认 365。"""
|
||
v = self._enroll.get('validity_days', 365)
|
||
return int(v)
|
||
|
||
# ── 厂商 SDK 映射(原始数据,供 API 层序列化给前端)───────────────────────
|
||
|
||
@staticmethod
|
||
def _render(data, trans_filter=None):
|
||
"""
|
||
渲染 YAML 数据中的 Jinja2 模板表达式。
|
||
- {{ settings.xxx }} → 系统设置值(任何时候都生效)
|
||
- {{ user.xxx }} → 原样保留,留给前端 JS 运行时解析
|
||
- {{ 'text' | trans }} → 按 trans_filter 翻译;不传则原文返回(初始化阶段)
|
||
"""
|
||
from jinja2 import Undefined, Environment
|
||
|
||
class KeepUndefined(Undefined):
|
||
"""未定义变量原样保留占位符,支持任意深度的属性链。"""
|
||
def __str__(self):
|
||
return '{{ ' + self._undefined_name + ' }}'
|
||
def __getattr__(self, name):
|
||
return KeepUndefined(name=f'{self._undefined_name}.{name}')
|
||
|
||
template_str = json.dumps(data, ensure_ascii=False)
|
||
env = Environment(undefined=KeepUndefined)
|
||
env.filters['trans'] = trans_filter or (lambda s: s)
|
||
rendered = env.from_string(template_str).render(settings=Setting)
|
||
return json.loads(rendered)
|
||
|
||
def _build_trans_filter(self, lang):
|
||
"""构建 Jinja2 | trans filter 函数,按 lang 从 YAML i18n 表查找翻译。
|
||
未找到翻译时原文返回;语言键自动归一化(zh_hant → zh-hant)。
|
||
"""
|
||
i18n_raw = self._raw.get('i18n') or {}
|
||
i18n = {
|
||
text: {
|
||
Language.to_internal_code(lk.replace('_', '-')): lv
|
||
for lk, lv in entries.items()
|
||
}
|
||
for text, entries in i18n_raw.items()
|
||
if isinstance(entries, dict)
|
||
}
|
||
|
||
def trans_filter(s):
|
||
translations = i18n.get(str(s))
|
||
if not translations:
|
||
return s
|
||
return translations.get(lang) or translations.get('en') or s
|
||
|
||
return trans_filter
|
||
|
||
def get_vendor_sdk_data(self, lang='en'):
|
||
"""返回去掉 'cert'/'i18n' 顶层 key 后的厂商 SDK 方法映射。
|
||
YAML 中任意字符串值均可用 {{ 'text' | trans }} 语法标记为可翻译。
|
||
"""
|
||
lang = Language.to_internal_code(lang)
|
||
trans_filter = self._build_trans_filter(lang)
|
||
data = self._render(self._raw, trans_filter)
|
||
return {k: v for k, v in data.items() if k not in ('i18n',)}
|
||
|
||
|
||
cert_vd_cfg = CertVendorDriverConfig()
|