jumpserver/apps/reports/api/accouts/automation.py
2025-08-12 19:19:00 +08:00

92 lines
2.8 KiB
Python

# -*- coding: utf-8 -*-
#
from collections import defaultdict
from django.http import JsonResponse
from django.utils.translation import gettext_lazy as _
from rest_framework.views import APIView
from accounts.const import AutomationTypes
from accounts.models import ChangeSecretAutomation, PushAccountAutomation, BackupAccountAutomation, \
CheckAccountAutomation, GatherAccountsAutomation, AutomationExecution
from common.permissions import IsValidLicense
from rbac.permissions import RBACPermission
from reports.mixins import DateRangeMixin
__all__ = ['AccountAutomationApi']
class AccountAutomationApi(DateRangeMixin, APIView):
http_method_names = ['get']
rbac_perms = {
'GET': 'accounts.view_account',
}
permission_classes = [RBACPermission, IsValidLicense]
@property
def change_secret_qs(self):
return ChangeSecretAutomation.objects.all()
@property
def push_qs(self):
return PushAccountAutomation.objects.all()
@property
def backup_qs(self):
return BackupAccountAutomation.objects.all()
@property
def check_qs(self):
return CheckAccountAutomation.objects.all()
@property
def collect_qs(self):
return GatherAccountsAutomation.objects.all()
def get_execution_metrics(self):
executions = AutomationExecution.objects.filter(type__in=AutomationTypes.values)
filtered_queryset = self.filter_by_date_range(executions, 'date_start')
types = set()
data = defaultdict(lambda: defaultdict(int))
for t, tp in filtered_queryset.values_list('date_start', 'type'):
if not tp:
continue
types.add(tp)
date_str = str(t.date())
data[date_str][tp] += 1
tp_map = defaultdict(list)
for d in self.date_range_list:
tp_data = data.get(str(d), {})
for tp in types:
tp_map[tp].append(tp_data.get(tp, 0))
metrics = {}
for tp, values in tp_map.items():
if tp == AutomationTypes.change_secret:
_tp = _('Account change secret')
else:
_tp = AutomationTypes(tp).label
metrics[str(_tp)] = values
return metrics
def get(self, request, *args, **kwargs):
stats = {
'push': self.push_qs.count(),
'check': self.check_qs.count(),
'backup': self.backup_qs.count(),
'collect': self.collect_qs.count(),
'change_secret': self.change_secret_qs.count(),
}
payload = {
'automation_stats': stats,
'execution_metrics': {
'dates_metrics_date': self.dates_metrics_date,
'data': self.get_execution_metrics()
},
}
return JsonResponse(payload, status=200)