mirror of
https://github.com/jumpserver/jumpserver.git
synced 2025-09-05 17:30:30 +00:00
perf: merge with dev
This commit is contained in:
191
apps/common/db/fields.py
Normal file
191
apps/common/db/fields.py
Normal file
@@ -0,0 +1,191 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
import json
|
||||
from django.db import models
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
from django.utils.encoding import force_text
|
||||
from django.core.validators import MinValueValidator, MaxValueValidator
|
||||
from common.utils import signer, crypto
|
||||
|
||||
|
||||
__all__ = [
|
||||
'JsonMixin', 'JsonDictMixin', 'JsonListMixin', 'JsonTypeMixin',
|
||||
'JsonCharField', 'JsonTextField', 'JsonListCharField', 'JsonListTextField',
|
||||
'JsonDictCharField', 'JsonDictTextField', 'EncryptCharField',
|
||||
'EncryptTextField', 'EncryptMixin', 'EncryptJsonDictTextField',
|
||||
'EncryptJsonDictCharField', 'PortField'
|
||||
]
|
||||
|
||||
|
||||
class JsonMixin:
|
||||
tp = None
|
||||
|
||||
@staticmethod
|
||||
def json_decode(data):
|
||||
try:
|
||||
return json.loads(data)
|
||||
except (TypeError, json.JSONDecodeError):
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def json_encode(data):
|
||||
return json.dumps(data)
|
||||
|
||||
def from_db_value(self, value, expression, connection, context=None):
|
||||
if value is None:
|
||||
return value
|
||||
return self.json_decode(value)
|
||||
|
||||
def to_python(self, value):
|
||||
if value is None:
|
||||
return value
|
||||
|
||||
if not isinstance(value, str) or not value.startswith('"'):
|
||||
return value
|
||||
else:
|
||||
return self.json_decode(value)
|
||||
|
||||
def get_prep_value(self, value):
|
||||
if value is None:
|
||||
return value
|
||||
return self.json_encode(value)
|
||||
|
||||
|
||||
class JsonTypeMixin(JsonMixin):
|
||||
tp = dict
|
||||
|
||||
def from_db_value(self, value, expression, connection, context=None):
|
||||
value = super().from_db_value(value, expression, connection, context)
|
||||
if not isinstance(value, self.tp):
|
||||
value = self.tp()
|
||||
return value
|
||||
|
||||
def to_python(self, value):
|
||||
data = super().to_python(value)
|
||||
if not isinstance(data, self.tp):
|
||||
data = self.tp()
|
||||
return data
|
||||
|
||||
def get_prep_value(self, value):
|
||||
if not isinstance(value, self.tp):
|
||||
value = self.tp()
|
||||
return self.json_encode(value)
|
||||
|
||||
|
||||
class JsonDictMixin(JsonTypeMixin):
|
||||
tp = dict
|
||||
|
||||
|
||||
class JsonDictCharField(JsonDictMixin, models.CharField):
|
||||
description = _("Marshal dict data to char field")
|
||||
|
||||
|
||||
class JsonDictTextField(JsonDictMixin, models.TextField):
|
||||
description = _("Marshal dict data to text field")
|
||||
|
||||
|
||||
class JsonListMixin(JsonTypeMixin):
|
||||
tp = list
|
||||
|
||||
|
||||
class JsonStrListMixin(JsonListMixin):
|
||||
pass
|
||||
|
||||
|
||||
class JsonListCharField(JsonListMixin, models.CharField):
|
||||
description = _("Marshal list data to char field")
|
||||
|
||||
|
||||
class JsonListTextField(JsonListMixin, models.TextField):
|
||||
description = _("Marshal list data to text field")
|
||||
|
||||
|
||||
class JsonCharField(JsonMixin, models.CharField):
|
||||
description = _("Marshal data to char field")
|
||||
|
||||
|
||||
class JsonTextField(JsonMixin, models.TextField):
|
||||
description = _("Marshal data to text field")
|
||||
|
||||
|
||||
class EncryptMixin:
|
||||
"""
|
||||
EncryptMixin要放在最前面
|
||||
"""
|
||||
|
||||
def decrypt_from_signer(self, value):
|
||||
return signer.unsign(value) or ''
|
||||
|
||||
def from_db_value(self, value, expression, connection, context=None):
|
||||
if value is None:
|
||||
return value
|
||||
value = force_text(value)
|
||||
|
||||
plain_value = crypto.decrypt(value)
|
||||
|
||||
# 如果没有解开,使用原来的signer解密
|
||||
if not plain_value:
|
||||
plain_value = self.decrypt_from_signer(value)
|
||||
|
||||
# 可能和Json mix,所以要先解密,再json
|
||||
sp = super()
|
||||
if hasattr(sp, 'from_db_value'):
|
||||
plain_value = sp.from_db_value(plain_value, expression, connection, context)
|
||||
return plain_value
|
||||
|
||||
def get_prep_value(self, value):
|
||||
if value is None:
|
||||
return value
|
||||
|
||||
# 先 json 再解密
|
||||
sp = super()
|
||||
if hasattr(sp, 'get_prep_value'):
|
||||
value = sp.get_prep_value(value)
|
||||
value = force_text(value)
|
||||
# 替换新的加密方式
|
||||
|
||||
|
||||
class EncryptTextField(EncryptMixin, models.TextField):
|
||||
description = _("Encrypt field using Secret Key")
|
||||
|
||||
|
||||
class EncryptCharField(EncryptMixin, models.CharField):
|
||||
@staticmethod
|
||||
def change_max_length(kwargs):
|
||||
kwargs.setdefault('max_length', 1024)
|
||||
max_length = kwargs.get('max_length')
|
||||
if max_length < 129:
|
||||
max_length = 128
|
||||
max_length = max_length * 2
|
||||
kwargs['max_length'] = max_length
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.change_max_length(kwargs)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def deconstruct(self):
|
||||
name, path, args, kwargs = super().deconstruct()
|
||||
max_length = kwargs.pop('max_length')
|
||||
if max_length > 255:
|
||||
max_length = max_length // 2
|
||||
kwargs['max_length'] = max_length
|
||||
return name, path, args, kwargs
|
||||
|
||||
|
||||
class EncryptJsonDictTextField(EncryptMixin, JsonDictTextField):
|
||||
pass
|
||||
|
||||
|
||||
class EncryptJsonDictCharField(EncryptMixin, JsonDictCharField):
|
||||
pass
|
||||
|
||||
|
||||
class PortField(models.IntegerField):
|
||||
def __init__(self, *args, **kwargs):
|
||||
kwargs.update({
|
||||
'blank': False,
|
||||
'null': False,
|
||||
'validators': [MinValueValidator(0), MaxValueValidator(65535)]
|
||||
})
|
||||
super().__init__(*args, **kwargs)
|
||||
|
Reference in New Issue
Block a user