feat: cert auth support rsa

This commit is contained in:
Bai
2026-05-26 14:44:08 +08:00
parent 6024d8d14f
commit 1863c496a2
3 changed files with 109 additions and 5 deletions

View File

@@ -131,7 +131,47 @@ class CertEnrollAPIView(APIView):
return self._SM2_OID_DER in der
def sign_cert_by_other(self, csr_pem):
pass
import datetime
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, rsa
csr = x509.load_pem_x509_csr(csr_pem.encode())
pub_key = csr.public_key()
if isinstance(pub_key, ec.EllipticCurvePublicKey):
raise NotImplementedError('ECDSA certificate signing is not supported')
if not isinstance(pub_key, rsa.RSAPublicKey):
raise ValueError('Unsupported key type: {}'.format(type(pub_key).__name__))
ca_key_path = cert_vd_cfg.ca_key_file
ca_cert_path = cert_vd_cfg.ca_cert_file
ca_key_pass = cert_vd_cfg.ca_key_pass
if not ca_key_path or not os.path.isfile(ca_key_path):
raise FileNotFoundError('CA_KEY_FILE not configured or not found')
if not ca_cert_path or not os.path.isfile(ca_cert_path):
raise FileNotFoundError('CA_CERT_FILE not configured or not found')
with open(ca_cert_path, 'rb') as f:
ca_cert = x509.load_pem_x509_certificate(f.read())
with open(ca_key_path, 'rb') as f:
password = ca_key_pass.encode() if ca_key_pass else None
ca_key = serialization.load_pem_private_key(f.read(), password=password)
validity_days = cert_vd_cfg.enroll_validity_days
now = datetime.datetime.now(datetime.timezone.utc)
cert = (
x509.CertificateBuilder()
.subject_name(csr.subject)
.issuer_name(ca_cert.subject)
.public_key(pub_key)
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + datetime.timedelta(days=validity_days))
.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)
.sign(ca_key, hashes.SHA256())
)
return cert.public_bytes(serialization.Encoding.PEM).decode('utf-8')
def sign_cert_by_gmssl(self, csr_pem):
"""

View File

@@ -186,8 +186,72 @@ class CertBackend(JMSBaseAuthBackend):
pass
raise ValueError('Cannot decode signature: unknown format')
# ── 其他算法(预留)────────────────────────────────────────────────────────
# ── RSA 四步校验 ─────────────────────────────────────────────────────────
def _authenticate_other(self, cert_pem, username, signature, challenge):
logger.warning('CertBackend: non-SM2 cert verification is not yet implemented')
return None
from cryptography import x509
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa
# Step 1: 加载证书,判断算法类型
try:
cert = x509.load_pem_x509_certificate(cert_pem.encode())
except Exception as e:
logger.warning('CertBackend: failed to load certificate: %s', e)
return None
pub_key = cert.public_key()
if isinstance(pub_key, ec.EllipticCurvePublicKey):
logger.warning('CertBackend: ECDSA certificate verification is not supported')
return None
if not isinstance(pub_key, rsa.RSAPublicKey):
logger.warning('CertBackend: unsupported key type: %s', type(pub_key).__name__)
return None
# Step 2: 校验证书链,是否由 CA 根证书签发
ca_cert_path = getattr(settings, 'CA_CERT_FILE', '')
if not ca_cert_path or not os.path.isfile(ca_cert_path):
logger.warning('CertBackend: CA_CERT_FILE not configured or not found')
return None
try:
with open(ca_cert_path, 'rb') as f:
ca_cert = x509.load_pem_x509_certificate(f.read())
ca_cert.public_key().verify(
cert.signature,
cert.tbs_certificate_bytes,
padding.PKCS1v15(),
cert.signature_hash_algorithm,
)
except InvalidSignature:
logger.warning('CertBackend: RSA cert chain verification failed')
return None
except Exception as e:
logger.warning('CertBackend: RSA cert chain verification error: %s', e)
return None
# Step 3: 从证书 subject 提取 CN与传入 username 比对
try:
cert_cn = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value
except (IndexError, Exception):
cert_cn = None
if cert_cn != username:
logger.warning(
'CertBackend: cert CN %r does not match username %r', cert_cn, username
)
return None
# Step 4: 用证书公钥验证签名RSA PKCS1v15 + SHA256
sig_bytes = self._decode_signature(signature)
signed_data = challenge if isinstance(challenge, bytes) else challenge.encode('utf-8')
try:
pub_key.verify(sig_bytes, signed_data, padding.PKCS1v15(), hashes.SHA256())
except InvalidSignature:
logger.warning('CertBackend: RSA signature mismatch')
return None
except Exception as e:
logger.warning('CertBackend: RSA signature verification error: %s', e)
return None
# Step 5: 查询并返回用户
return User.objects.filter(username=username).first()

View File

@@ -71,7 +71,7 @@ class CertVendorDriverConfig:
@property
def ca_key_pass(self):
"""CA 私钥密码,只从系统设置读取。"""
return getattr(settings, 'CA_KEY_PASS', '')
return str(getattr(settings, 'CA_KEY_PASS', ''))
@property
def driver_js_file(self):