mirror of
https://github.com/jumpserver/jumpserver.git
synced 2026-03-18 11:02:09 +00:00
feat: enhance key handling in cipher classes
This commit is contained in:
@@ -1,7 +1,13 @@
|
||||
from django.conf import settings
|
||||
|
||||
from .device import Device
|
||||
|
||||
DEFAULT_DRIVER_PATH = "./lib/libpiico_ccmu.so"
|
||||
|
||||
def open_piico_device(driver_path) -> Device:
|
||||
|
||||
def open_piico_device(driver_path=None) -> Device:
|
||||
if driver_path is None:
|
||||
driver_path = settings.PIICO_DRIVER_PATH or DEFAULT_DRIVER_PATH
|
||||
d = Device()
|
||||
d.open(driver_path)
|
||||
return d
|
||||
|
||||
@@ -22,21 +22,17 @@ class EBCCipher:
|
||||
|
||||
def __init__(self, session, key_val):
|
||||
self._session = session
|
||||
self._key = self.__get_key(key_val)
|
||||
self._key = key_val if isinstance(key_val, bytes) else bytes(key_val, encoding='utf-8')
|
||||
self._alg = "sm4_ebc"
|
||||
self._iv = None
|
||||
|
||||
def __get_key(self, key_val):
|
||||
key_val = self.__padding(key_val)
|
||||
return self._session.import_key(key_val)
|
||||
|
||||
@staticmethod
|
||||
def __padding(val):
|
||||
# padding
|
||||
val = bytes(val)
|
||||
while len(val) == 0 or len(val) % 16 != 0:
|
||||
val += b'\0'
|
||||
return val
|
||||
def __padding(data):
|
||||
if not isinstance(data, bytes):
|
||||
data = bytes(data, encoding='utf-8')
|
||||
while len(data) == 0 or len(data) % 16 != 0:
|
||||
data += b'\0'
|
||||
return data
|
||||
|
||||
def encrypt(self, plain_text):
|
||||
plain_text = self.__padding(plain_text)
|
||||
@@ -48,7 +44,6 @@ class EBCCipher:
|
||||
return bytes(plain_text)
|
||||
|
||||
def destroy(self):
|
||||
self._session.destroy_cipher_key(self._key)
|
||||
self._session.close()
|
||||
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
|
||||
from .ecc import *
|
||||
from .exception import PiicoError
|
||||
|
||||
@@ -13,9 +12,8 @@ class BaseMixin:
|
||||
class SM2Mixin(BaseMixin):
|
||||
def ecc_encrypt(self, public_key, plain_text, alg_id):
|
||||
|
||||
pos = 1
|
||||
pos = 0
|
||||
k1 = bytes([0] * 32) + bytes(public_key[pos:pos + 32])
|
||||
k1 = (c_ubyte * len(k1))(*k1)
|
||||
pos += 32
|
||||
k2 = bytes([0] * 32) + bytes(public_key[pos:pos + 32])
|
||||
|
||||
@@ -88,21 +86,6 @@ class SM3Mixin(BaseMixin):
|
||||
|
||||
class SM4Mixin(BaseMixin):
|
||||
|
||||
def import_key(self, key_val):
|
||||
# to c lang
|
||||
key_val = (c_ubyte * len(key_val))(*key_val)
|
||||
|
||||
key = c_void_p()
|
||||
ret = self._driver.SDF_ImportKey(self._session, key_val, c_int(len(key_val)), pointer(key))
|
||||
if ret != 0:
|
||||
raise PiicoError("import key failed", ret)
|
||||
return key
|
||||
|
||||
def destroy_cipher_key(self, key):
|
||||
ret = self._driver.SDF_DestroyKey(self._session, key)
|
||||
if ret != 0:
|
||||
raise Exception("destroy key failed")
|
||||
|
||||
def encrypt(self, plain_text, key, alg, iv=None):
|
||||
return self.__do_cipher_action(plain_text, key, alg, iv, True)
|
||||
|
||||
@@ -110,20 +93,35 @@ class SM4Mixin(BaseMixin):
|
||||
return self.__do_cipher_action(cipher_text, key, alg, iv, False)
|
||||
|
||||
def __do_cipher_action(self, text, key, alg, iv=None, encrypt=True):
|
||||
text = (c_ubyte * len(text))(*text)
|
||||
text_buf = (c_ubyte * len(text))(*text)
|
||||
key_buf = (c_ubyte * len(key))(*key)
|
||||
|
||||
iv_buf = None
|
||||
iv_len = 0
|
||||
if iv is not None:
|
||||
iv = (c_ubyte * len(iv))(*iv)
|
||||
iv_buf = (c_ubyte * len(iv))(*iv)
|
||||
iv_len = len(iv)
|
||||
|
||||
temp_data = (c_ubyte * len(text))()
|
||||
temp_data_length = c_int()
|
||||
if encrypt:
|
||||
ret = self._driver.SDF_Encrypt(self._session, key, c_int(alg), iv, text, c_int(len(text)), temp_data,
|
||||
pointer(temp_data_length))
|
||||
ret = self._driver.SPII_EncryptEx(
|
||||
self._session, text_buf, c_int(len(text)),
|
||||
key_buf, c_int(len(key)),
|
||||
iv_buf, c_int(iv_len),
|
||||
c_int(alg),
|
||||
temp_data, pointer(temp_data_length),
|
||||
)
|
||||
if ret != 0:
|
||||
raise PiicoError("encrypt failed", ret)
|
||||
else:
|
||||
ret = self._driver.SDF_Decrypt(self._session, key, c_int(alg), iv, text, c_int(len(text)), temp_data,
|
||||
pointer(temp_data_length))
|
||||
ret = self._driver.SPII_DecryptEx(
|
||||
self._session, text_buf, c_int(len(text)),
|
||||
key_buf, c_int(len(key)),
|
||||
iv_buf, c_int(iv_len),
|
||||
c_int(alg),
|
||||
temp_data, pointer(temp_data_length),
|
||||
)
|
||||
if ret != 0:
|
||||
raise PiicoError("decrypt failed", ret)
|
||||
return temp_data[:temp_data_length.value]
|
||||
|
||||
@@ -210,9 +210,7 @@ class Crypto:
|
||||
if not crypt_algo:
|
||||
if settings.GMSSL_ENABLED:
|
||||
if settings.PIICO_DEVICE_ENABLE:
|
||||
piico_driver_path = settings.PIICO_DRIVER_PATH if settings.PIICO_DRIVER_PATH \
|
||||
else "./lib/libpiico_ccmu.so"
|
||||
device = piico.open_piico_device(piico_driver_path)
|
||||
device = piico.open_piico_device()
|
||||
self.cryptor_map["piico_gm"] = get_piico_gm_sm4_ecb_crypto(device)
|
||||
crypt_algo = 'piico_gm'
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user