diff --git a/apps/common/fields/model.py b/apps/common/fields/model.py index b4ac41b2f..1161944a6 100644 --- a/apps/common/fields/model.py +++ b/apps/common/fields/model.py @@ -5,7 +5,7 @@ from django.db import models from django.utils.translation import ugettext_lazy as _ from django.utils.encoding import force_text -from ..utils import signer, aes_crypto, aes_ecb_crypto +from ..utils import signer, crypto __all__ = [ @@ -116,27 +116,12 @@ class EncryptMixin: def decrypt_from_signer(self, value): return signer.unsign(value) or '' - def decrypt_from_aes(self, value): - """ - 先尝试使用GCM模式解密,如果解不开,再尝试使用原来的ECB模式解密 - """ - try: - return aes_crypto.decrypt(value) - except ValueError: - pass - - try: - return aes_ecb_crypto.decrypt(value) - except (TypeError, ValueError, UnicodeDecodeError): - pass - def from_db_value(self, value, expression, connection, context): if value is None: return value value = force_text(value) - # 优先采用 aes 解密 - plain_value = self.decrypt_from_aes(value) + plain_value = crypto.decrypt(value) # 如果没有解开,使用原来的signer解密 if not plain_value: @@ -158,7 +143,7 @@ class EncryptMixin: value = sp.get_prep_value(value) value = force_text(value) # 替换新的加密方式 - return aes_crypto.encrypt(value) + return crypto.encrypt(value) class EncryptTextField(EncryptMixin, models.TextField): diff --git a/apps/common/utils/crypto.py b/apps/common/utils/crypto.py index 31991c93c..e70bd0395 100644 --- a/apps/common/utils/crypto.py +++ b/apps/common/utils/crypto.py @@ -2,8 +2,58 @@ import base64 from Crypto.Cipher import AES from Crypto.Util.Padding import pad from Crypto.Random import get_random_bytes +from gmssl.sm4 import CryptSM4, SM4_ENCRYPT, SM4_DECRYPT from django.conf import settings +from django.core.exceptions import ImproperlyConfigured + + +def process_key(key): + """ + 返回32 bytes 的key + """ + if not isinstance(key, bytes): + key = bytes(key, encoding='utf-8') + + if len(key) >= 32: + return key[:32] + + return pad(key, 32) + + +class BaseCrypto: + + def encrypt(self, text): + return base64.urlsafe_b64encode( + self._encrypt(bytes(text, encoding='utf8')) + ).decode('utf8') + + def _encrypt(self, data: bytes) -> bytes: + raise NotImplementedError + + def decrypt(self, text): + return self._decrypt( + base64.urlsafe_b64decode(bytes(text, encoding='utf8')) + ).decode('utf8') + + def _decrypt(self, data: bytes) -> bytes: + raise NotImplementedError + + +class GMSM4EcbCrypto(BaseCrypto): + def __init__(self, key): + self.key = process_key(key) + self.sm4_encryptor = CryptSM4() + self.sm4_encryptor.set_key(self.key, SM4_ENCRYPT) + + self.sm4_decryptor = CryptSM4() + self.sm4_decryptor.set_key(self.key, SM4_DECRYPT) + + def _encrypt(self, data: bytes) -> bytes: + return self.sm4_encryptor.crypt_ecb(data) + + def _decrypt(self, data: bytes) -> bytes: + return self.sm4_decryptor.crypt_ecb(data) class AESCrypto: @@ -52,20 +102,7 @@ class AESCryptoGCM: """ def __init__(self, key): - self.key = self.process_key(key) - - @staticmethod - def process_key(key): - """ - 返回32 bytes 的key - """ - if not isinstance(key, bytes): - key = bytes(key, encoding='utf-8') - - if len(key) >= 32: - return key[:32] - - return pad(key, 32) + self.key = process_key(key) def encrypt(self, text): """ @@ -110,5 +147,50 @@ def get_aes_crypto(key=None, mode='GCM'): return a +def get_gm_sm4_ecb_crypto(key=None): + key = key or settings.SECRET_KEY + return GMSM4EcbCrypto(key) + + aes_ecb_crypto = get_aes_crypto(mode='ECB') aes_crypto = get_aes_crypto(mode='GCM') +gm_sm4_ecb_crypto = get_gm_sm4_ecb_crypto() + + +class Crypto: + cryptoes = { + 'aes_ecb': aes_ecb_crypto, + 'aes_gcm': aes_crypto, + 'aes': aes_crypto, + 'gm_sm4_ecb': gm_sm4_ecb_crypto, + 'gm': gm_sm4_ecb_crypto, + } + + def __init__(self): + cryptoes = self.__class__.cryptoes.copy() + crypto = cryptoes.pop(settings.SECURITY_DATA_CRYPTO_ALGO, None) + if crypto is None: + raise ImproperlyConfigured( + f'Crypto method not supported {settings.SECURITY_DATA_CRYPTO_ALGO}' + ) + self.cryptoes = [crypto, *cryptoes.values()] + + @property + def encryptor(self): + return self.cryptoes[0] + + def encrypt(self, text): + return self.encryptor.encrypt(text) + + def decrypt(self, text): + for decryptor in self.cryptoes: + try: + origin_text = decryptor.decrypt(text) + if origin_text: + # 有时不同算法解密不报错,但是返回空字符串 + return origin_text + except (TypeError, ValueError, UnicodeDecodeError): + continue + + +crypto = Crypto() diff --git a/apps/jumpserver/conf.py b/apps/jumpserver/conf.py index 381dc5beb..bd5f45d3d 100644 --- a/apps/jumpserver/conf.py +++ b/apps/jumpserver/conf.py @@ -244,6 +244,7 @@ class Config(dict): 'SECURITY_PASSWORD_SPECIAL_CHAR': False, 'SECURITY_LOGIN_CHALLENGE_ENABLED': False, 'SECURITY_LOGIN_CAPTCHA_ENABLED': True, + 'SECURITY_DATA_CRYPTO_ALGO': 'aes', 'HTTP_BIND_HOST': '0.0.0.0', 'HTTP_LISTEN_PORT': 8080, diff --git a/apps/jumpserver/settings/custom.py b/apps/jumpserver/settings/custom.py index 47bf4bebe..379c6ebef 100644 --- a/apps/jumpserver/settings/custom.py +++ b/apps/jumpserver/settings/custom.py @@ -54,6 +54,7 @@ SECURITY_VIEW_AUTH_NEED_MFA = CONFIG.SECURITY_VIEW_AUTH_NEED_MFA SECURITY_SERVICE_ACCOUNT_REGISTRATION = DYNAMIC.SECURITY_SERVICE_ACCOUNT_REGISTRATION SECURITY_LOGIN_CAPTCHA_ENABLED = CONFIG.SECURITY_LOGIN_CAPTCHA_ENABLED SECURITY_LOGIN_CHALLENGE_ENABLED = CONFIG.SECURITY_LOGIN_CHALLENGE_ENABLED +SECURITY_DATA_CRYPTO_ALGO = CONFIG.SECURITY_DATA_CRYPTO_ALGO # Terminal other setting TERMINAL_PASSWORD_AUTH = DYNAMIC.TERMINAL_PASSWORD_AUTH diff --git a/requirements/requirements.txt b/requirements/requirements.txt index cf814708e..6da1663dd 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -98,3 +98,4 @@ django-redis==4.11.0 python-redis-lock==3.5.0 jumpserver-django-oidc-rp==0.3.7.5 django-mysql==3.9.0 +gmssl==3.2.1