From 6630cd54fd0254e0bfa5df3a24880b32153e5806 Mon Sep 17 00:00:00 2001 From: Aaron3S Date: Mon, 1 Jun 2026 15:39:45 +0800 Subject: [PATCH] feat: hsm sctu --- apps/common/sdk/gm/__init__.py | 5 +- apps/common/sdk/gm/sctu/__init__.py | 3 ++ apps/common/sdk/gm/sctu/cipher.py | 59 ++++++++++++++++++++++ apps/common/sdk/gm/sctu/device.py | 38 ++++++++++++++ apps/common/sdk/gm/sctu/session.py | 26 ++++++++++ apps/common/sdk/gm/sctu/session_mixin.py | 64 ++++++++++++++++++++++++ 6 files changed, 194 insertions(+), 1 deletion(-) create mode 100644 apps/common/sdk/gm/sctu/__init__.py create mode 100644 apps/common/sdk/gm/sctu/cipher.py create mode 100644 apps/common/sdk/gm/sctu/device.py create mode 100644 apps/common/sdk/gm/sctu/session.py create mode 100644 apps/common/sdk/gm/sctu/session_mixin.py diff --git a/apps/common/sdk/gm/__init__.py b/apps/common/sdk/gm/__init__.py index 5dfd2f6f9..a5ae1701d 100644 --- a/apps/common/sdk/gm/__init__.py +++ b/apps/common/sdk/gm/__init__.py @@ -1,12 +1,13 @@ from .base.device import Device from enum import Enum -from common.sdk.gm import piico, ccupm +from common.sdk.gm import piico, ccupm, sctu class CryptoVendor(Enum): PIICO = "piico" CCUPM = "ccupm" + SCTU = "sctu" @classmethod def from_str(cls, name: str): @@ -24,5 +25,7 @@ def open_gm_device(vendor: CryptoVendor) -> Device: return piico.PiicoDevice() elif vendor is CryptoVendor.CCUPM: return ccupm.CCUPMDevice() + elif vendor is CryptoVendor.SCTU: + return sctu.SCTUDevice() else: raise Exception("UnSupported HSM") diff --git a/apps/common/sdk/gm/sctu/__init__.py b/apps/common/sdk/gm/sctu/__init__.py new file mode 100644 index 000000000..29ee050ed --- /dev/null +++ b/apps/common/sdk/gm/sctu/__init__.py @@ -0,0 +1,3 @@ +from .device import SCTUDevice + + diff --git a/apps/common/sdk/gm/sctu/cipher.py b/apps/common/sdk/gm/sctu/cipher.py new file mode 100644 index 000000000..a0a7c29cf --- /dev/null +++ b/apps/common/sdk/gm/sctu/cipher.py @@ -0,0 +1,59 @@ +cipher_alg_id = { + "sm4_ebc": 0x00000401, + "sm4_cbc": 0x00000402, + "sm4_mac": 0x00000405, +} + + +class ECCCipher: + def __init__(self, session, public_key, private_key): + self._session = session + self.public_key = public_key + self.private_key = private_key + + def encrypt(self, plain_text): + return self._session.ecc_encrypt(self.public_key, plain_text, 0x00020800) + + def decrypt(self, cipher_text): + return self._session.ecc_decrypt(self.private_key, cipher_text, 0x00020800) + + +class EBCCipher: + + def __init__(self, session, key_val): + self._session = session + self._key = self.__get_key(key_val) + self._alg = "sm4_ebc" + self._iv = None + + def __get_key(self, key_val): + key_val = self.__padding(key_val) + return key_val + + @staticmethod + def __padding(val): + # padding + val = bytes(val) + while len(val) == 0 or len(val) % 16 != 0: + val += b'\0' + return val + + def encrypt(self, plain_text): + plain_text = self.__padding(plain_text) + cipher_text = self._session.encrypt(plain_text, self._key, cipher_alg_id[self._alg], self._iv) + return bytes(cipher_text) + + def decrypt(self, cipher_text): + plain_text = self._session.decrypt(cipher_text, self._key, cipher_alg_id[self._alg], self._iv) + return bytes(plain_text) + + def destroy(self): + self._session.close() + + +class CBCCipher(EBCCipher): + + def __init__(self, session, key, iv): + super().__init__(session, key) + self._iv = iv + self._alg = "sm4_cbc" diff --git a/apps/common/sdk/gm/sctu/device.py b/apps/common/sdk/gm/sctu/device.py new file mode 100644 index 000000000..dddd24583 --- /dev/null +++ b/apps/common/sdk/gm/sctu/device.py @@ -0,0 +1,38 @@ +from ctypes import * + +from ..base.device import Device +from .session import Session +from .cipher import EBCCipher + + +class SCTUDevice(Device): + name = "sctu" + + def __init__(self): + self.open("libhsctu_guomi_vpn.so") + + def __open_device(self): + device = c_void_p() + ret = self._driver.HS_SDF_OpenDevice(pointer(device)) + if ret != 0: + raise Exception("open {} device failed".format(self.name), ret) + self.__device = device + + def new_session(self): + session = c_void_p() + ret = self._driver.HS_SDF_OpenSession(self.__device, pointer(session)) + if ret != 0: + raise Exception("create session failed") + return Session(self._driver, session) + + def new_sm4_ebc_cipher(self, key_val): + session = self.new_session() + return EBCCipher(session, key_val) + + def close(self): + if self.__device is None: + raise Exception("device not turned on") + ret = self._driver.HS_SDF_CloseDevice(self.__device) + if ret != 0: + raise Exception("turn off device failed") + self.__device = None diff --git a/apps/common/sdk/gm/sctu/session.py b/apps/common/sdk/gm/sctu/session.py new file mode 100644 index 000000000..5cb5de9b3 --- /dev/null +++ b/apps/common/sdk/gm/sctu/session.py @@ -0,0 +1,26 @@ +from ctypes import * + +from common.sdk.gm.base.exception import GMDeviceError +from .session_mixin import SM4Mixin + + +class Session(SM4Mixin): + def __init__(self, driver, session): + super().__init__() + self._session = session + self._driver = driver + + def get_device_info(self): + pass + + def generate_random(self, length=64): + random_data = (c_ubyte * length)() + ret = self._driver.HS_SDF_GenerateRandom(self._session, c_int(length), random_data) + if ret != 0: + raise GMDeviceError("generate random error", ret) + return bytes(random_data) + + def close(self): + ret = self._driver.HS_SDF_CloseSession(self._session) + if ret != 0: + raise GMDeviceError("close session failed", ret) diff --git a/apps/common/sdk/gm/sctu/session_mixin.py b/apps/common/sdk/gm/sctu/session_mixin.py new file mode 100644 index 000000000..a9a1ad519 --- /dev/null +++ b/apps/common/sdk/gm/sctu/session_mixin.py @@ -0,0 +1,64 @@ +from ctypes import * + +from common.sdk.gm.base.exception import GMDeviceError +from common.sdk.gm.base.session_mixin import BaseMixin + + +def as_uchar_array(data: bytes): + return (c_ubyte * len(data))(*data) + + +class SM4Mixin(BaseMixin): + + ## 此处不导入 key + def import_key(self, key_val): + pass + + def destroy_cipher_key(self, key): + pass + + def encrypt(self, plain_text, key, alg, iv=None): + return self.__do_cipher_action(plain_text, key, alg, iv, True) + + def decrypt(self, cipher_text, key, alg, iv=None): + return self.__do_cipher_action(cipher_text, key, alg, iv, False) + + def __do_cipher_action(self, text, key, alg, iv=None, encrypt=True): + text = (c_ubyte * len(text))(*text) + if iv is not None: + iv = (c_ubyte * len(iv))(*iv) + + temp_data = (c_ubyte * len(text))() + temp_data_length = c_int() + + ## 这里的 key 不是指针而是明文 + + key_arr = as_uchar_array(key) + + if encrypt: + ret = self._driver.HS_SDF_Encrypt(self._session, + key_arr, + c_int(len(key_arr)), + 0, + c_int(alg), + iv, + text, + c_int(len(text)), + temp_data, + pointer(temp_data_length)) + if ret != 0: + raise GMDeviceError("encrypt failed", ret) + else: + ret = self._driver.HS_SDF_Decrypt(self._session, + key_arr, + c_int(len(key_arr)), + 0, + c_int(alg), + iv, + text, + c_int(len(text)), + temp_data, + pointer(temp_data_length)) + if ret != 0: + raise GMDeviceError("decrypt failed", ret) + return temp_data[:temp_data_length.value]