mirror of
https://github.com/jumpserver/jumpserver.git
synced 2025-09-16 07:18:22 +00:00
feat(crypto): 支持国密算法
This commit is contained in:
@@ -2,8 +2,58 @@ import base64
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import pad
|
||||
from Crypto.Random import get_random_bytes
|
||||
from gmssl.sm4 import CryptSM4, SM4_ENCRYPT, SM4_DECRYPT
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
|
||||
|
||||
def process_key(key):
|
||||
"""
|
||||
返回32 bytes 的key
|
||||
"""
|
||||
if not isinstance(key, bytes):
|
||||
key = bytes(key, encoding='utf-8')
|
||||
|
||||
if len(key) >= 32:
|
||||
return key[:32]
|
||||
|
||||
return pad(key, 32)
|
||||
|
||||
|
||||
class BaseCrypto:
|
||||
|
||||
def encrypt(self, text):
|
||||
return base64.urlsafe_b64encode(
|
||||
self._encrypt(bytes(text, encoding='utf8'))
|
||||
).decode('utf8')
|
||||
|
||||
def _encrypt(self, data: bytes) -> bytes:
|
||||
raise NotImplementedError
|
||||
|
||||
def decrypt(self, text):
|
||||
return self._decrypt(
|
||||
base64.urlsafe_b64decode(bytes(text, encoding='utf8'))
|
||||
).decode('utf8')
|
||||
|
||||
def _decrypt(self, data: bytes) -> bytes:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class GMSM4EcbCrypto(BaseCrypto):
|
||||
def __init__(self, key):
|
||||
self.key = process_key(key)
|
||||
self.sm4_encryptor = CryptSM4()
|
||||
self.sm4_encryptor.set_key(self.key, SM4_ENCRYPT)
|
||||
|
||||
self.sm4_decryptor = CryptSM4()
|
||||
self.sm4_decryptor.set_key(self.key, SM4_DECRYPT)
|
||||
|
||||
def _encrypt(self, data: bytes) -> bytes:
|
||||
return self.sm4_encryptor.crypt_ecb(data)
|
||||
|
||||
def _decrypt(self, data: bytes) -> bytes:
|
||||
return self.sm4_decryptor.crypt_ecb(data)
|
||||
|
||||
|
||||
class AESCrypto:
|
||||
@@ -52,20 +102,7 @@ class AESCryptoGCM:
|
||||
"""
|
||||
|
||||
def __init__(self, key):
|
||||
self.key = self.process_key(key)
|
||||
|
||||
@staticmethod
|
||||
def process_key(key):
|
||||
"""
|
||||
返回32 bytes 的key
|
||||
"""
|
||||
if not isinstance(key, bytes):
|
||||
key = bytes(key, encoding='utf-8')
|
||||
|
||||
if len(key) >= 32:
|
||||
return key[:32]
|
||||
|
||||
return pad(key, 32)
|
||||
self.key = process_key(key)
|
||||
|
||||
def encrypt(self, text):
|
||||
"""
|
||||
@@ -110,5 +147,50 @@ def get_aes_crypto(key=None, mode='GCM'):
|
||||
return a
|
||||
|
||||
|
||||
def get_gm_sm4_ecb_crypto(key=None):
|
||||
key = key or settings.SECRET_KEY
|
||||
return GMSM4EcbCrypto(key)
|
||||
|
||||
|
||||
aes_ecb_crypto = get_aes_crypto(mode='ECB')
|
||||
aes_crypto = get_aes_crypto(mode='GCM')
|
||||
gm_sm4_ecb_crypto = get_gm_sm4_ecb_crypto()
|
||||
|
||||
|
||||
class Crypto:
|
||||
cryptoes = {
|
||||
'aes_ecb': aes_ecb_crypto,
|
||||
'aes_gcm': aes_crypto,
|
||||
'aes': aes_crypto,
|
||||
'gm_sm4_ecb': gm_sm4_ecb_crypto,
|
||||
'gm': gm_sm4_ecb_crypto,
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
cryptoes = self.__class__.cryptoes.copy()
|
||||
crypto = cryptoes.pop(settings.SECURITY_DATA_CRYPTO_ALGO, None)
|
||||
if crypto is None:
|
||||
raise ImproperlyConfigured(
|
||||
f'Crypto method not supported {settings.SECURITY_DATA_CRYPTO_ALGO}'
|
||||
)
|
||||
self.cryptoes = [crypto, *cryptoes.values()]
|
||||
|
||||
@property
|
||||
def encryptor(self):
|
||||
return self.cryptoes[0]
|
||||
|
||||
def encrypt(self, text):
|
||||
return self.encryptor.encrypt(text)
|
||||
|
||||
def decrypt(self, text):
|
||||
for decryptor in self.cryptoes:
|
||||
try:
|
||||
origin_text = decryptor.decrypt(text)
|
||||
if origin_text:
|
||||
# 有时不同算法解密不报错,但是返回空字符串
|
||||
return origin_text
|
||||
except (TypeError, ValueError, UnicodeDecodeError):
|
||||
continue
|
||||
|
||||
|
||||
crypto = Crypto()
|
||||
|
Reference in New Issue
Block a user