feat: add ECC signature verification and logging; refactor key generation

This commit is contained in:
halo
2026-02-03 17:49:59 +08:00
parent e66406a548
commit 881d5df108
5 changed files with 80 additions and 21 deletions

View File

@@ -7,6 +7,9 @@ from .cipher import *
from .digest import *
from django.core.cache import cache
from redis_lock import Lock as RedisLock
from common.utils import get_logger
logger = get_logger(__file__)
class Device:
@@ -43,6 +46,13 @@ class Device:
session = self.new_session()
return session.generate_random(length)
def verify_sign(self, public_key, raw_data, sign_data):
logger.debug("verify_sign public_key: %s", public_key)
logger.debug("verify_sign raw_data: %s", raw_data)
logger.debug("verify_sign sign_data: %s", sign_data)
session = self.new_session()
return session.verify_sign_ecc(0x00020200, public_key, raw_data, sign_data)
def new_sm2_ecc_cipher(self, public_key, private_key):
session = self.new_session()
return ECCCipher(session, public_key, private_key)
@@ -98,4 +108,4 @@ class Device:
# ---- 真正执行 reset ----
ret = self._driver.SPII_ResetModule(self.__device)
if ret != 0:
raise PiicoError("reset device failed", ret)
raise PiicoError("reset device failed", ret)

View File

@@ -11,9 +11,9 @@ class EncodeMixin:
class ECCrefPublicKey(Structure, EncodeMixin):
_fields_ = [
('bits', c_uint),
('x', c_ubyte * ECCref_MAX_LEN),
('y', c_ubyte * ECCref_MAX_LEN),
("bits", c_uint),
("x", c_ubyte * ECCref_MAX_LEN),
("y", c_ubyte * ECCref_MAX_LEN),
]
def encode(self):
@@ -22,8 +22,11 @@ class ECCrefPublicKey(Structure, EncodeMixin):
class ECCrefPrivateKey(Structure, EncodeMixin):
_fields_ = [
('bits', c_uint,),
('K', c_ubyte * ECCref_MAX_LEN),
(
"bits",
c_uint,
),
("K", c_ubyte * ECCref_MAX_LEN),
]
def encode(self):
@@ -41,7 +44,7 @@ class ECCCipherEncode(EncodeMixin):
def encode(self):
c1 = bytes(self.x[32:]) + bytes(self.y[32:])
c2 = bytes(self.C[:self.L])
c2 = bytes(self.C[: self.L])
c3 = bytes(self.M)
return bytes([0x04]) + c1 + c2 + c3
@@ -52,15 +55,19 @@ def new_ecc_cipher_cla(length):
if _cache.__contains__(cla_name):
return _cache[cla_name]
else:
cla = type(cla_name, (Structure, ECCCipherEncode), {
"_fields_": [
('x', c_ubyte * ECCref_MAX_LEN),
('y', c_ubyte * ECCref_MAX_LEN),
('M', c_ubyte * 32),
('L', c_uint),
('C', c_ubyte * length)
]
})
cla = type(
cla_name,
(Structure, ECCCipherEncode),
{
"_fields_": [
("x", c_ubyte * ECCref_MAX_LEN),
("y", c_ubyte * ECCref_MAX_LEN),
("M", c_ubyte * 32),
("L", c_uint),
("C", c_ubyte * length),
]
},
)
_cache[cla_name] = cla
return cla
@@ -69,3 +76,10 @@ class ECCKeyPair:
def __init__(self, public_key, private_key):
self.public_key = public_key
self.private_key = private_key
class ECCSignature(Structure, EncodeMixin):
_fields_ = [
("r", c_ubyte * ECCref_MAX_LEN),
("s", c_ubyte * ECCref_MAX_LEN),
]

View File

@@ -1,6 +1,7 @@
from ctypes import *
from .ecc import ECCrefPublicKey, ECCrefPrivateKey, ECCKeyPair
from Cryptodome.Util.asn1 import DerSequence
from .ecc import ECCrefPublicKey, ECCrefPrivateKey, ECCKeyPair, ECCSignature
from .exception import PiicoError
from .session_mixin import SM3Mixin, SM4Mixin, SM2Mixin
@@ -24,12 +25,47 @@ class Session(SM2Mixin, SM3Mixin, SM4Mixin):
def generate_ecc_key_pair(self, alg_id):
public_key = ECCrefPublicKey()
private_key = ECCrefPrivateKey()
ret = self._driver.SDF_GenerateKeyPair_ECC(self._session, c_int(alg_id), c_int(256), pointer(public_key),
pointer(private_key))
ret = self._driver.SDF_GenerateKeyPair_ECC(
self._session,
c_int(alg_id),
c_int(256),
pointer(public_key),
pointer(private_key),
)
if ret != 0:
raise PiicoError("generate ecc key pair failed", ret)
return ECCKeyPair(public_key.encode(), private_key.encode())
def verify_sign_ecc(self, alg_id, public_key, raw_data, sign_data):
pos = 0
k1 = bytes([0] * 32) + bytes(public_key[pos : pos + 32])
pos += 32
k2 = bytes([0] * 32) + bytes(public_key[pos : pos + 32])
pk = ECCrefPublicKey(
c_uint(0x100), (c_ubyte * len(k1))(*k1), (c_ubyte * len(k2))(*k2)
)
seq_der = DerSequence()
decoded_sign = seq_der.decode(sign_data)
if decoded_sign and len(decoded_sign) != 2:
raise PiicoError("verify_sign decoded_sign", -1)
r = bytes([0] * 32) + int(decoded_sign[0]).to_bytes(32, byteorder="big")
s = bytes([0] * 32) + int(decoded_sign[1]).to_bytes(32, byteorder="big")
signature = ECCSignature((c_ubyte * len(r))(*r), (c_ubyte * len(s))(*s))
plain_text = (c_ubyte * len(raw_data))(*raw_data)
ret = self._driver.SDF_ExternalVerify_ECC(
self._session,
c_int(alg_id),
pointer(pk),
plain_text,
c_int(len(plain_text)),
pointer(signature),
)
if ret != 0:
raise PiicoError("verify_sign", ret)
return True
def close(self):
ret = self._driver.SDF_CloseSession(self._session)
if ret != 0:

View File

@@ -13,9 +13,8 @@ class BaseMixin:
class SM2Mixin(BaseMixin):
def ecc_encrypt(self, public_key, plain_text, alg_id):
pos = 1
pos = 0
k1 = bytes([0] * 32) + bytes(public_key[pos:pos + 32])
k1 = (c_ubyte * len(k1))(*k1)
pos += 32
k2 = bytes([0] * 32) + bytes(public_key[pos:pos + 32])