From c7dcf1ba59551b3d5f03417f30b98c4f975e697b Mon Sep 17 00:00:00 2001 From: ibuler Date: Thu, 10 Jul 2025 18:32:22 +0800 Subject: [PATCH] perf: playbook task db save if conn timeout --- .../automations/check_account/manager.py | 5 +++ .../automations/gather_account/manager.py | 33 +++++++++++-------- apps/assets/automations/base/manager.py | 14 ++++---- apps/audits/signal_handlers/activity_log.py | 2 ++ apps/common/db/utils.py | 25 +++++++++----- apps/common/decorators.py | 14 +------- 6 files changed, 50 insertions(+), 43 deletions(-) diff --git a/apps/accounts/automations/check_account/manager.py b/apps/accounts/automations/check_account/manager.py index c8d1101c4..8ea9f4582 100644 --- a/apps/accounts/automations/check_account/manager.py +++ b/apps/accounts/automations/check_account/manager.py @@ -15,11 +15,13 @@ from common.decorators import bulk_create_decorator, bulk_update_decorator from settings.models import LeakPasswords +# 已设置手动 finish @bulk_create_decorator(AccountRisk) def create_risk(data): return AccountRisk(**data) +# 已设置手动 finish @bulk_update_decorator(AccountRisk, update_fields=["details", "status"]) def update_risk(risk): return risk @@ -217,6 +219,9 @@ class CheckAccountManager(BaseManager): "details": [{"datetime": now, 'type': 'init'}], }) + create_risk.finish() + update_risk.finish() + def pre_run(self): super().pre_run() self.assets = self.execution.get_all_assets() diff --git a/apps/accounts/automations/gather_account/manager.py b/apps/accounts/automations/gather_account/manager.py index de677fa6d..cfdae0fc5 100644 --- a/apps/accounts/automations/gather_account/manager.py +++ b/apps/accounts/automations/gather_account/manager.py @@ -30,6 +30,16 @@ common_risk_items = [ diff_items = risk_items + common_risk_items +@bulk_create_decorator(AccountRisk) +def _create_risk(self, data): + return AccountRisk(**data) + + +@bulk_update_decorator(AccountRisk, update_fields=["details"]) +def _update_risk(self, account): + return account + + def format_datetime(value): if isinstance(value, timezone.datetime): return value.strftime("%Y-%m-%d %H:%M:%S") @@ -141,25 +151,17 @@ class AnalyseAccountRisk: found = assets_risks.get(key) if not found: - self._create_risk(dict(**d, details=[detail])) + _create_risk(dict(**d, details=[detail])) continue found.details.append(detail) - self._update_risk(found) - - @bulk_create_decorator(AccountRisk) - def _create_risk(self, data): - return AccountRisk(**data) - - @bulk_update_decorator(AccountRisk, update_fields=["details"]) - def _update_risk(self, account): - return account + _update_risk(found) def lost_accounts(self, asset, lost_users): if not self.check_risk: return for user in lost_users: - self._create_risk( + _create_risk( dict( asset_id=str(asset.id), username=user, @@ -176,7 +178,7 @@ class AnalyseAccountRisk: self._analyse_item_changed(ga, d) if not sys_found: basic = {"asset": asset, "username": d["username"], 'gathered_account': ga} - self._create_risk( + _create_risk( dict( **basic, risk=RiskChoice.new_found, @@ -388,8 +390,6 @@ class GatherAccountsManager(AccountBasePlaybookManager): self.update_gathered_account(ori_account, d) ori_found = username in ori_users need_analyser_gather_account.append((asset, ga, d, ori_found)) - self.create_gathered_account.finish() - self.update_gathered_account.finish() for analysis_data in need_analyser_gather_account: risk_analyser.analyse_risk(*analysis_data) self.update_gather_accounts_status(asset) @@ -403,6 +403,11 @@ class GatherAccountsManager(AccountBasePlaybookManager): present=True ) # 因为有 bulk create, bulk update, 所以这里需要 sleep 一下,等待数据同步 + self.create_gathered_account.finish() + self.update_gathered_account.finish() + _update_risk.finish() + _create_risk.finish() + time.sleep(0.5) def get_report_template(self): diff --git a/apps/assets/automations/base/manager.py b/apps/assets/automations/base/manager.py index 9ff38ba63..edb51d385 100644 --- a/apps/assets/automations/base/manager.py +++ b/apps/assets/automations/base/manager.py @@ -123,9 +123,7 @@ class BaseManager: self.execution.summary = self.summary self.execution.result = self.result self.execution.status = self.status - - with safe_atomic_db_connection(): - self.execution.save() + self.execution.save() def print_summary(self): content = "\nSummery: \n" @@ -167,9 +165,10 @@ class BaseManager: return data def post_run(self): - self.update_execution() - self.print_summary() - self.send_report_if_need() + with safe_atomic_db_connection(): + self.update_execution() + self.print_summary() + self.send_report_if_need() def run(self, *args, **kwargs): self.pre_run() @@ -548,7 +547,8 @@ class BasePlaybookManager(PlaybookPrepareMixin, BaseManager): try: kwargs.update({"clean_workspace": False}) cb = runner.run(**kwargs) - self.on_runner_success(runner, cb) + with safe_atomic_db_connection(): + self.on_runner_success(runner, cb) except Exception as e: self.on_runner_failed(runner, e, **info) finally: diff --git a/apps/audits/signal_handlers/activity_log.py b/apps/audits/signal_handlers/activity_log.py index 90583472b..1a5677e6f 100644 --- a/apps/audits/signal_handlers/activity_log.py +++ b/apps/audits/signal_handlers/activity_log.py @@ -89,6 +89,8 @@ def create_activities(resource_ids, detail, detail_id, action, org_id): for activity in activities: create_activity(activity) + create_activity.finish() + @signals.after_task_publish.connect def after_task_publish_for_activity_log(headers=None, body=None, **kwargs): diff --git a/apps/common/db/utils.py b/apps/common/db/utils.py index a3099c2cf..976478d09 100644 --- a/apps/common/db/utils.py +++ b/apps/common/db/utils.py @@ -50,13 +50,14 @@ def get_objects(model, pks): # 复制 django.db.close_old_connections, 因为它没有导出,ide 提示有问题 -def close_old_connections(): - for conn in connections.all(): +def close_old_connections(**kwargs): + for conn in connections.all(initialized_only=True): conn.close_if_unusable_or_obsolete() +# 这个要是在 Django 请求周期外使用的,不能影响 Django 的事务管理, 在 api 中使用会影响 api 事务 @contextmanager -def safe_db_connection(auto_close=True): +def safe_db_connection(): close_old_connections() yield close_old_connections() @@ -64,19 +65,25 @@ def safe_db_connection(auto_close=True): @contextmanager def safe_atomic_db_connection(auto_close=False): - in_atomic_block = connection.in_atomic_block # 当前是否处于事务中 - autocommit = transaction.get_autocommit() # 是否启用了自动提交 - created = False + """ + 通用数据库连接管理器(线程安全、事务感知): + - 在连接不可用时主动重建连接 + - 在非事务环境下自动关闭连接(可选) + - 不影响 Django 请求/事务周期 + """ + in_atomic = connection.in_atomic_block # 当前是否在事务中 + autocommit = transaction.get_autocommit() + recreated = False try: if not connection.is_usable(): connection.close() connection.connect() - created = True + recreated = True yield finally: - # 如果不是事务中(API 请求中可能需要提交事务),则关闭连接 - if auto_close or (created and not in_atomic_block and autocommit): + # 只在非事务、autocommit 模式下,才考虑主动清理连接 + if auto_close or (recreated and not in_atomic and autocommit): close_old_connections() diff --git a/apps/common/decorators.py b/apps/common/decorators.py index 963c5c481..3ddee64ab 100644 --- a/apps/common/decorators.py +++ b/apps/common/decorators.py @@ -302,16 +302,8 @@ def bulk_handle(handler, batch_size=50, timeout=0.5): cache = [] # 缓存实例的列表 lock = threading.Lock() # 用于线程安全 - timer = [None] # 定时器对象,列表存储以便重置 org_id = None - def reset_timer(): - """重置定时器""" - if timer[0] is not None: - timer[0].cancel() - timer[0] = threading.Timer(timeout, handle_remaining) - timer[0].start() - def handle_it(): from orgs.utils import tmp_to_org with lock: @@ -351,17 +343,13 @@ def bulk_handle(handler, batch_size=50, timeout=0.5): if len(cache) >= batch_size: handle_it() - reset_timer() return instance # 提交剩余实例的方法 def handle_remaining(): if not cache: return - print("Timer expired. Saving remaining instances.") - from orgs.utils import tmp_to_org - with tmp_to_org(org_id): - handle_it() + handle_it() wrapper.finish = handle_remaining return wrapper