feat: hsm sctu

This commit is contained in:
Aaron3S
2026-06-01 15:39:45 +08:00
parent 02137ec789
commit 6630cd54fd
6 changed files with 194 additions and 1 deletions

View File

@@ -1,12 +1,13 @@
from .base.device import Device
from enum import Enum
from common.sdk.gm import piico, ccupm
from common.sdk.gm import piico, ccupm, sctu
class CryptoVendor(Enum):
PIICO = "piico"
CCUPM = "ccupm"
SCTU = "sctu"
@classmethod
def from_str(cls, name: str):
@@ -24,5 +25,7 @@ def open_gm_device(vendor: CryptoVendor) -> Device:
return piico.PiicoDevice()
elif vendor is CryptoVendor.CCUPM:
return ccupm.CCUPMDevice()
elif vendor is CryptoVendor.SCTU:
return sctu.SCTUDevice()
else:
raise Exception("UnSupported HSM")

View File

@@ -0,0 +1,3 @@
from .device import SCTUDevice

View File

@@ -0,0 +1,59 @@
cipher_alg_id = {
"sm4_ebc": 0x00000401,
"sm4_cbc": 0x00000402,
"sm4_mac": 0x00000405,
}
class ECCCipher:
def __init__(self, session, public_key, private_key):
self._session = session
self.public_key = public_key
self.private_key = private_key
def encrypt(self, plain_text):
return self._session.ecc_encrypt(self.public_key, plain_text, 0x00020800)
def decrypt(self, cipher_text):
return self._session.ecc_decrypt(self.private_key, cipher_text, 0x00020800)
class EBCCipher:
def __init__(self, session, key_val):
self._session = session
self._key = self.__get_key(key_val)
self._alg = "sm4_ebc"
self._iv = None
def __get_key(self, key_val):
key_val = self.__padding(key_val)
return key_val
@staticmethod
def __padding(val):
# padding
val = bytes(val)
while len(val) == 0 or len(val) % 16 != 0:
val += b'\0'
return val
def encrypt(self, plain_text):
plain_text = self.__padding(plain_text)
cipher_text = self._session.encrypt(plain_text, self._key, cipher_alg_id[self._alg], self._iv)
return bytes(cipher_text)
def decrypt(self, cipher_text):
plain_text = self._session.decrypt(cipher_text, self._key, cipher_alg_id[self._alg], self._iv)
return bytes(plain_text)
def destroy(self):
self._session.close()
class CBCCipher(EBCCipher):
def __init__(self, session, key, iv):
super().__init__(session, key)
self._iv = iv
self._alg = "sm4_cbc"

View File

@@ -0,0 +1,38 @@
from ctypes import *
from ..base.device import Device
from .session import Session
from .cipher import EBCCipher
class SCTUDevice(Device):
name = "sctu"
def __init__(self):
self.open("libhsctu_guomi_vpn.so")
def __open_device(self):
device = c_void_p()
ret = self._driver.HS_SDF_OpenDevice(pointer(device))
if ret != 0:
raise Exception("open {} device failed".format(self.name), ret)
self.__device = device
def new_session(self):
session = c_void_p()
ret = self._driver.HS_SDF_OpenSession(self.__device, pointer(session))
if ret != 0:
raise Exception("create session failed")
return Session(self._driver, session)
def new_sm4_ebc_cipher(self, key_val):
session = self.new_session()
return EBCCipher(session, key_val)
def close(self):
if self.__device is None:
raise Exception("device not turned on")
ret = self._driver.HS_SDF_CloseDevice(self.__device)
if ret != 0:
raise Exception("turn off device failed")
self.__device = None

View File

@@ -0,0 +1,26 @@
from ctypes import *
from common.sdk.gm.base.exception import GMDeviceError
from .session_mixin import SM4Mixin
class Session(SM4Mixin):
def __init__(self, driver, session):
super().__init__()
self._session = session
self._driver = driver
def get_device_info(self):
pass
def generate_random(self, length=64):
random_data = (c_ubyte * length)()
ret = self._driver.HS_SDF_GenerateRandom(self._session, c_int(length), random_data)
if ret != 0:
raise GMDeviceError("generate random error", ret)
return bytes(random_data)
def close(self):
ret = self._driver.HS_SDF_CloseSession(self._session)
if ret != 0:
raise GMDeviceError("close session failed", ret)

View File

@@ -0,0 +1,64 @@
from ctypes import *
from common.sdk.gm.base.exception import GMDeviceError
from common.sdk.gm.base.session_mixin import BaseMixin
def as_uchar_array(data: bytes):
return (c_ubyte * len(data))(*data)
class SM4Mixin(BaseMixin):
## 此处不导入 key
def import_key(self, key_val):
pass
def destroy_cipher_key(self, key):
pass
def encrypt(self, plain_text, key, alg, iv=None):
return self.__do_cipher_action(plain_text, key, alg, iv, True)
def decrypt(self, cipher_text, key, alg, iv=None):
return self.__do_cipher_action(cipher_text, key, alg, iv, False)
def __do_cipher_action(self, text, key, alg, iv=None, encrypt=True):
text = (c_ubyte * len(text))(*text)
if iv is not None:
iv = (c_ubyte * len(iv))(*iv)
temp_data = (c_ubyte * len(text))()
temp_data_length = c_int()
## 这里的 key 不是指针而是明文
key_arr = as_uchar_array(key)
if encrypt:
ret = self._driver.HS_SDF_Encrypt(self._session,
key_arr,
c_int(len(key_arr)),
0,
c_int(alg),
iv,
text,
c_int(len(text)),
temp_data,
pointer(temp_data_length))
if ret != 0:
raise GMDeviceError("encrypt failed", ret)
else:
ret = self._driver.HS_SDF_Decrypt(self._session,
key_arr,
c_int(len(key_arr)),
0,
c_int(alg),
iv,
text,
c_int(len(text)),
temp_data,
pointer(temp_data_length))
if ret != 0:
raise GMDeviceError("decrypt failed", ret)
return temp_data[:temp_data_length.value]