jumpserver/apps/common/db/utils.py

115 lines
3.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from contextlib import contextmanager
from django.db import connections, transaction, connection
from django.utils.encoding import force_str
from common.utils import get_logger, signer, crypto
logger = get_logger(__file__)
def default_ip_group():
return ["*"]
def get_object_if_need(model, pk):
if not isinstance(pk, model):
try:
return model.objects.get(id=pk)
except model.DoesNotExist as e:
logger.error(f"DoesNotExist: <{model.__name__}:{pk}> not exist")
raise e
return pk
def get_objects_if_need(model, pks):
if not pks:
return pks
if not isinstance(pks[0], model):
objs = list(model.objects.filter(id__in=pks))
if len(objs) != len(pks):
pks = set(pks)
exists_pks = {o.id for o in objs}
not_found_pks = ",".join(pks - exists_pks)
logger.error(f"DoesNotExist: <{model.__name__}: {not_found_pks}>")
return objs
return pks
def get_objects(model, pks):
if not pks:
return pks
objs = list(model.objects.filter(id__in=pks))
if len(objs) != len(pks):
pks = set(pks)
exists_pks = {o.id for o in objs}
not_found_pks = pks - exists_pks
logger.error(f"DoesNotExist: <{model.__name__}: {not_found_pks}>")
return objs
# 复制 django.db.close_old_connections, 因为它没有导出ide 提示有问题
def close_old_connections(**kwargs):
for conn in connections.all(initialized_only=True):
conn.close_if_unusable_or_obsolete()
# 这个要是在 Django 请求周期外使用的,不能影响 Django 的事务管理, 在 api 中使用会影响 api 事务
@contextmanager
def safe_db_connection():
close_old_connections()
yield
close_old_connections()
@contextmanager
def safe_atomic_db_connection(auto_close=False):
"""
通用数据库连接管理器(线程安全、事务感知):
- 在连接不可用时主动重建连接
- 在非事务环境下自动关闭连接(可选)
- 不影响 Django 请求/事务周期
"""
in_atomic = connection.in_atomic_block # 当前是否在事务中
autocommit = transaction.get_autocommit()
recreated = False
try:
if not connection.is_usable():
connection.close()
connection.connect()
recreated = True
yield
finally:
# 只在非事务、autocommit 模式下,才考虑主动清理连接
if auto_close or (recreated and not in_atomic and autocommit):
close_old_connections()
@contextmanager
def open_db_connection(alias="default"):
connection = transaction.get_connection(alias)
try:
connection.connect()
with transaction.atomic():
yield connection
finally:
connection.close()
class Encryptor:
def __init__(self, value):
self.value = force_str(value)
def decrypt(self):
plain_value = crypto.decrypt(self.value)
# 如果没有解开使用原来的signer解密
if not plain_value:
plain_value = signer.unsign(self.value) or ""
return plain_value
def encrypt(self):
return crypto.encrypt(self.value)