From 1863c496a2a7200d96d0d0f275a9c31db1f30f7e Mon Sep 17 00:00:00 2001 From: Bai Date: Tue, 26 May 2026 14:44:08 +0800 Subject: [PATCH] feat: cert auth support rsa --- apps/authentication/backends/cert/api.py | 42 ++++++++++- apps/authentication/backends/cert/backends.py | 70 ++++++++++++++++++- apps/authentication/backends/cert/driver.py | 2 +- 3 files changed, 109 insertions(+), 5 deletions(-) diff --git a/apps/authentication/backends/cert/api.py b/apps/authentication/backends/cert/api.py index 7e46a9ec5..0b16b1218 100644 --- a/apps/authentication/backends/cert/api.py +++ b/apps/authentication/backends/cert/api.py @@ -131,7 +131,47 @@ class CertEnrollAPIView(APIView): return self._SM2_OID_DER in der def sign_cert_by_other(self, csr_pem): - pass + import datetime + from cryptography import x509 + from cryptography.hazmat.primitives import hashes, serialization + from cryptography.hazmat.primitives.asymmetric import ec, rsa + + csr = x509.load_pem_x509_csr(csr_pem.encode()) + pub_key = csr.public_key() + + if isinstance(pub_key, ec.EllipticCurvePublicKey): + raise NotImplementedError('ECDSA certificate signing is not supported') + if not isinstance(pub_key, rsa.RSAPublicKey): + raise ValueError('Unsupported key type: {}'.format(type(pub_key).__name__)) + + ca_key_path = cert_vd_cfg.ca_key_file + ca_cert_path = cert_vd_cfg.ca_cert_file + ca_key_pass = cert_vd_cfg.ca_key_pass + if not ca_key_path or not os.path.isfile(ca_key_path): + raise FileNotFoundError('CA_KEY_FILE not configured or not found') + if not ca_cert_path or not os.path.isfile(ca_cert_path): + raise FileNotFoundError('CA_CERT_FILE not configured or not found') + + with open(ca_cert_path, 'rb') as f: + ca_cert = x509.load_pem_x509_certificate(f.read()) + with open(ca_key_path, 'rb') as f: + password = ca_key_pass.encode() if ca_key_pass else None + ca_key = serialization.load_pem_private_key(f.read(), password=password) + + validity_days = cert_vd_cfg.enroll_validity_days + now = datetime.datetime.now(datetime.timezone.utc) + cert = ( + x509.CertificateBuilder() + .subject_name(csr.subject) + .issuer_name(ca_cert.subject) + .public_key(pub_key) + .serial_number(x509.random_serial_number()) + .not_valid_before(now) + .not_valid_after(now + datetime.timedelta(days=validity_days)) + .add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True) + .sign(ca_key, hashes.SHA256()) + ) + return cert.public_bytes(serialization.Encoding.PEM).decode('utf-8') def sign_cert_by_gmssl(self, csr_pem): """ diff --git a/apps/authentication/backends/cert/backends.py b/apps/authentication/backends/cert/backends.py index d203120c1..e3a681bd3 100644 --- a/apps/authentication/backends/cert/backends.py +++ b/apps/authentication/backends/cert/backends.py @@ -186,8 +186,72 @@ class CertBackend(JMSBaseAuthBackend): pass raise ValueError('Cannot decode signature: unknown format') - # ── 其他算法(预留)──────────────────────────────────────────────────────── + # ── RSA 四步校验 ───────────────────────────────────────────────────────── def _authenticate_other(self, cert_pem, username, signature, challenge): - logger.warning('CertBackend: non-SM2 cert verification is not yet implemented') - return None \ No newline at end of file + from cryptography import x509 + from cryptography.exceptions import InvalidSignature + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa + + # Step 1: 加载证书,判断算法类型 + try: + cert = x509.load_pem_x509_certificate(cert_pem.encode()) + except Exception as e: + logger.warning('CertBackend: failed to load certificate: %s', e) + return None + + pub_key = cert.public_key() + if isinstance(pub_key, ec.EllipticCurvePublicKey): + logger.warning('CertBackend: ECDSA certificate verification is not supported') + return None + if not isinstance(pub_key, rsa.RSAPublicKey): + logger.warning('CertBackend: unsupported key type: %s', type(pub_key).__name__) + return None + + # Step 2: 校验证书链,是否由 CA 根证书签发 + ca_cert_path = getattr(settings, 'CA_CERT_FILE', '') + if not ca_cert_path or not os.path.isfile(ca_cert_path): + logger.warning('CertBackend: CA_CERT_FILE not configured or not found') + return None + try: + with open(ca_cert_path, 'rb') as f: + ca_cert = x509.load_pem_x509_certificate(f.read()) + ca_cert.public_key().verify( + cert.signature, + cert.tbs_certificate_bytes, + padding.PKCS1v15(), + cert.signature_hash_algorithm, + ) + except InvalidSignature: + logger.warning('CertBackend: RSA cert chain verification failed') + return None + except Exception as e: + logger.warning('CertBackend: RSA cert chain verification error: %s', e) + return None + + # Step 3: 从证书 subject 提取 CN,与传入 username 比对 + try: + cert_cn = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value + except (IndexError, Exception): + cert_cn = None + if cert_cn != username: + logger.warning( + 'CertBackend: cert CN %r does not match username %r', cert_cn, username + ) + return None + + # Step 4: 用证书公钥验证签名(RSA PKCS1v15 + SHA256) + sig_bytes = self._decode_signature(signature) + signed_data = challenge if isinstance(challenge, bytes) else challenge.encode('utf-8') + try: + pub_key.verify(sig_bytes, signed_data, padding.PKCS1v15(), hashes.SHA256()) + except InvalidSignature: + logger.warning('CertBackend: RSA signature mismatch') + return None + except Exception as e: + logger.warning('CertBackend: RSA signature verification error: %s', e) + return None + + # Step 5: 查询并返回用户 + return User.objects.filter(username=username).first() \ No newline at end of file diff --git a/apps/authentication/backends/cert/driver.py b/apps/authentication/backends/cert/driver.py index a3f39b103..018be821b 100644 --- a/apps/authentication/backends/cert/driver.py +++ b/apps/authentication/backends/cert/driver.py @@ -71,7 +71,7 @@ class CertVendorDriverConfig: @property def ca_key_pass(self): """CA 私钥密码,只从系统设置读取。""" - return getattr(settings, 'CA_KEY_PASS', '') + return str(getattr(settings, 'CA_KEY_PASS', '')) @property def driver_js_file(self):