From 996bee3afd6dbef3168b3361d872594996ed8fcb Mon Sep 17 00:00:00 2001 From: ibuler Date: Thu, 13 Mar 2025 17:54:05 +0800 Subject: [PATCH] perf: update gathered account sync --- .../accounts/api/automations/check_account.py | 6 +- .../api/automations/gather_account.py | 5 +- .../automations/check_account/manager.py | 6 +- apps/accounts/models/account.py | 12 ++- .../models/automations/gather_account.py | 93 ++++++++++++------- apps/audits/signal_handlers/activity_log.py | 12 ++- apps/i18n/lina/en.json | 2 +- 7 files changed, 88 insertions(+), 48 deletions(-) diff --git a/apps/accounts/api/automations/check_account.py b/apps/accounts/api/automations/check_account.py index c47c456d4..123e8bf6e 100644 --- a/apps/accounts/api/automations/check_account.py +++ b/apps/accounts/api/automations/check_account.py @@ -3,7 +3,6 @@ from django.db.models import Q, Count from django.http import HttpResponse from django.shortcuts import get_object_or_404 -from django.utils import timezone from rest_framework.decorators import action from rest_framework.exceptions import MethodNotAllowed from rest_framework.response import Response @@ -65,13 +64,14 @@ class CheckAccountExecutionViewSet(AutomationExecutionViewSet): return Response(status=400, data={"asset_id": "This field is required."}) get_object_or_404(Asset, pk=asset_id) + name = "Check asset risk: {}".format(asset_id) execution = AutomationExecution() execution.snapshot = { "assets": [asset_id], "nodes": [], "type": AutomationTypes.check_account, - "engines": ["check_account_secret"], - "name": "Check asset risk: {} {}".format(asset_id, timezone.now()), + "engines": "__all__", + "name": name, } execution.save() execution.start() diff --git a/apps/accounts/api/automations/gather_account.py b/apps/accounts/api/automations/gather_account.py index 732aa01f8..540f378e0 100644 --- a/apps/accounts/api/automations/gather_account.py +++ b/apps/accounts/api/automations/gather_account.py @@ -92,12 +92,13 @@ class GatheredAccountViewSet(OrgBulkModelViewSet): def status(self, request, *args, **kwargs): ids = request.data.get('ids', []) new_status = request.data.get("status") - updated_instances = GatheredAccount.objects.filter(id__in=ids) - updated_instances.update(status=new_status) + updated_instances = GatheredAccount.objects.filter(id__in=ids).select_related('asset') + if new_status == "confirmed": GatheredAccount.sync_accounts(updated_instances) updated_instances.update(present=True) + updated_instances.update(status=new_status) return Response(status=status.HTTP_200_OK) def perform_destroy(self, instance): diff --git a/apps/accounts/automations/check_account/manager.py b/apps/accounts/automations/check_account/manager.py index a86a07290..f4e646dd6 100644 --- a/apps/accounts/automations/check_account/manager.py +++ b/apps/accounts/automations/check_account/manager.py @@ -241,7 +241,11 @@ class CheckAccountManager(BaseManager): self.commit_risks(_assets) def do_run(self, *args, **kwargs): - for engine in self.execution.snapshot.get("engines", []): + engines = self.execution.snapshot.get("engines", []) + if engines == '__all__': + engines = ['check_account_secret', 'check_account_repeat', 'check_account_leak'] + + for engine in engines: if engine == "check_account_secret": handler = CheckSecretHandler(self.assets) elif engine == "check_account_repeat": diff --git a/apps/accounts/models/account.py b/apps/accounts/models/account.py index ab007d8cd..64b4e01a3 100644 --- a/apps/accounts/models/account.py +++ b/apps/accounts/models/account.py @@ -26,13 +26,15 @@ class AccountHistoricalRecords(HistoricalRecords): if not self.included_fields: return super().post_save(instance, created, using=using, **kwargs) - check_fields = set(self.included_fields) - {'version'} + # self.updated_version = 0 + if created: + return super().post_save(instance, created, using=using, **kwargs) history_account = instance.history.first() if history_account is None: - self.updated_version = 0 return super().post_save(instance, created, using=using, **kwargs) + check_fields = set(self.included_fields) - {'version'} history_attrs = {field: getattr(history_account, field) for field in check_fields} attrs = {field: getattr(instance, field) for field in check_fields} @@ -87,8 +89,10 @@ class Account(AbsConnectivity, LabeledMixin, BaseAccount, JSONFilterMixin): on_delete=models.SET_NULL, verbose_name=_("Su from") ) version = models.IntegerField(default=0, verbose_name=_('Version')) - history = AccountHistoricalRecords(included_fields=['id', '_secret', 'secret_type', 'version'], - verbose_name=_("historical Account")) + history = AccountHistoricalRecords( + included_fields=['id', '_secret', 'secret_type', 'version'], + verbose_name=_("historical Account") + ) secret_reset = models.BooleanField(default=True, verbose_name=_('Secret reset')) source = models.CharField(max_length=30, default=Source.LOCAL, verbose_name=_('Source')) source_id = models.CharField(max_length=128, null=True, blank=True, verbose_name=_('Source ID')) diff --git a/apps/accounts/models/automations/gather_account.py b/apps/accounts/models/automations/gather_account.py index eb90a0f5a..951cef091 100644 --- a/apps/accounts/models/automations/gather_account.py +++ b/apps/accounts/models/automations/gather_account.py @@ -1,5 +1,6 @@ +from collections import defaultdict + from django.db import models -from django.db.models import Q from django.utils.translation import gettext_lazy as _ from accounts.const import AutomationTypes, Source @@ -30,49 +31,73 @@ class GatheredAccount(JMSOrgBaseModel): return self.asset.address @classmethod - def update_exists_accounts(cls, gathered_account, accounts): - if not gathered_account.date_last_login: - return + def update_exists_accounts(cls, ga_accounts_set): # gathered_account, accounts): + to_updates = [] - for account in accounts: - # 这里是否可以考虑,标记成未从堡垒机登录风险 ? - if is_date_more_than(gathered_account.date_last_login, account.date_last_login, '5m'): + for gathered_account, accounts in ga_accounts_set: + if not gathered_account.date_last_login: + return + + for account in accounts: + # 这里是否可以考虑,标记成未从堡垒机登录风险 ? + if not is_date_more_than(gathered_account.date_last_login, account.date_last_login, '5m'): + continue account.date_last_login = gathered_account.date_last_login account.login_by = '{}({})'.format('unknown', gathered_account.address_last_login) - account.save(update_fields=['date_last_login', 'login_by']) + to_updates.append(account) + + Account.objects.bulk_update(to_updates, fields=['date_last_login', 'login_by']) @classmethod - def create_accounts(cls, gathered_account): + def bulk_create_accounts(cls, gathered_accounts): account_objs = [] - asset_id = gathered_account.asset_id - username = gathered_account.username - account = Account( - asset_id=asset_id, username=username, - name=username, source=Source.DISCOVERY, - date_last_login=gathered_account.date_last_login, - ) - account_objs.append(account) - Account.objects.bulk_create(account_objs) - gathered_account.status = ConfirmOrIgnore.confirmed - gathered_account.save(update_fields=['status']) - - @classmethod - def sync_accounts(cls, gathered_accounts, auto_create=True): - """ - 更新为已存在的账号,或者创建新的账号, 原来的 sync 重构了,如果存在则自动更新一些信息 - """ for gathered_account in gathered_accounts: asset_id = gathered_account.asset_id username = gathered_account.username - accounts = Account.objects.filter( - Q(asset_id=asset_id, username=username) | - Q(asset_id=asset_id, name=username) + account = Account( + asset_id=asset_id, username=username, + name=username, source=Source.DISCOVERY, + date_last_login=gathered_account.date_last_login, ) + account_objs.append(account) + Account.objects.bulk_create(account_objs, ignore_conflicts=True) - if accounts.exists(): - cls.update_exists_accounts(gathered_account, accounts) - elif auto_create: - cls.create_accounts(gathered_account) + ga_ids = [ga.id for ga in gathered_accounts] + GatheredAccount.objects.filter(id__in=ga_ids).update(status=ConfirmOrIgnore.confirmed) + + @classmethod + def sync_accounts(cls, gathered_accounts): + """ + 更新为已存在的账号,或者创建新的账号, 原来的 sync 重构了,如果存在则自动更新一些信息 + """ + assets = [gathered_account.asset_id for gathered_account in gathered_accounts] + usernames = [gathered_account.username for gathered_account in gathered_accounts] + + origin_accounts = Account.objects.filter( + asset__in=assets, username__in=usernames + ).select_related('asset') + + origin_mapper = defaultdict(list) + for origin_account in origin_accounts: + asset_id = origin_account.asset_id + username = origin_account.username + origin_mapper[(asset_id, username)].append(origin_account) + + to_update = [] + to_create = [] + + for gathered_account in gathered_accounts: + asset_id = gathered_account.asset_id + username = gathered_account.username + accounts = origin_mapper.get((asset_id, username)) + + if accounts: + to_update.append((gathered_account, accounts)) + else: + to_create.append(gathered_account) + + cls.bulk_create_accounts(to_create) + cls.update_exists_accounts(to_update) class Meta: verbose_name = _("Gather asset accounts") @@ -82,7 +107,7 @@ class GatheredAccount(JMSOrgBaseModel): ordering = ['asset'] def __str__(self): - return '{}: {}'.format(self.asset, self.username) + return '{}: {}'.format(self.asset_id, self.username) class GatherAccountsAutomation(AccountBaseAutomation): diff --git a/apps/audits/signal_handlers/activity_log.py b/apps/audits/signal_handlers/activity_log.py index 4f564a3c4..90583472b 100644 --- a/apps/audits/signal_handlers/activity_log.py +++ b/apps/audits/signal_handlers/activity_log.py @@ -6,6 +6,7 @@ from django.db.models.signals import post_save from django.utils.translation import gettext_lazy as _, gettext_noop from audits.models import ActivityLog +from common.decorators import bulk_create_decorator from common.utils import i18n_fmt, get_logger from jumpserver.utils import current_request from ops.celery import app @@ -67,21 +68,26 @@ class ActivityLogHandler: return resource_ids, detail, org_id +@bulk_create_decorator(ActivityLog) +def create_activity(data): + return ActivityLog(**data) + + def create_activities(resource_ids, detail, detail_id, action, org_id): if not resource_ids: return if not org_id: org_id = Organization.ROOT_ID activities = [ - ActivityLog( + dict( resource_id=getattr(resource_id, 'pk', resource_id), type=action, detail=detail, detail_id=detail_id, org_id=org_id ) for resource_id in resource_ids ] with tmp_to_org(org_id): - ActivityLog.objects.bulk_create(activities) - return activities + for activity in activities: + create_activity(activity) @signals.after_task_publish.connect diff --git a/apps/i18n/lina/en.json b/apps/i18n/lina/en.json index f5c528f2e..41bc2f68b 100644 --- a/apps/i18n/lina/en.json +++ b/apps/i18n/lina/en.json @@ -1264,7 +1264,7 @@ "SupportedProtocolHelpText": "Set supported protocols for the asset, you can modify the custom configurations, such as sftp directory, rdp ad domain, etc., by clicking on the set button", "Sync": "Sync", "SyncAction": "Synchronized action", - "SyncDelete": "Sync deletion", + "SyncDelete": "Sync delete", "SyncDeleteSelected": "Sync deletion selected", "SyncErrorMsg": "Sync failed", "SyncInstanceTaskCreate": "Create sync task",