Files
jumpserver/apps/reports/api/users/user.py
fit2bot b38d83c578 feat: report charts (#15630)
* perf: initial

* perf: basic finished

* perf: depend

* perf: Update Dockerfile with new base image tag

* perf: Add user report api

* perf: Update Dockerfile with new base image tag

* perf: Use user report api

* perf: Update Dockerfile with new base image tag

* perf: user login report

* perf: Update Dockerfile with new base image tag

* perf: user change password

* perf: change password dashboard

* perf: Update Dockerfile with new base image tag

* perf: Translate

* perf: asset api

* perf: asset activity

* perf: Asset report

* perf: add charts_map

* perf: account report

* perf: Translate

* perf: account automation

* perf: Account automation

* perf: title

* perf: Update Dockerfile with new base image tag

---------

Co-authored-by: ibuler <ibuler@qq.com>
Co-authored-by: feng <1304903146@qq.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: wangruidong <940853815@qq.com>
Co-authored-by: feng626 <57284900+feng626@users.noreply.github.com>
2025-08-06 14:05:38 +08:00

130 lines
4.9 KiB
Python

# -*- coding: utf-8 -*-
#
from collections import defaultdict
from django.db.models import Count, Q
from django.http.response import JsonResponse
from rest_framework.views import APIView
from audits.const import LoginStatusChoices
from audits.models import UserLoginLog
from common.permissions import IsValidLicense
from common.utils import lazyproperty
from rbac.permissions import RBACPermission
from reports.mixins import DateRangeMixin
__all__ = ['UserReportApi']
from users.models import User
from users.models.user import Source
class UserReportApi(DateRangeMixin, APIView):
http_method_names = ['get']
rbac_perms = {
'GET': 'users.view_users',
}
permission_classes = [RBACPermission, IsValidLicense]
def get_user_login_metrics(self, queryset):
filtered_queryset = self.filter_by_date_range(queryset, 'datetime')
data = defaultdict(set)
for t, username in filtered_queryset.values_list('datetime', 'username'):
date_str = str(t.date())
data[date_str].add(username)
metrics = [len(data.get(str(d), set())) for d in self.date_range_list]
return metrics
def get_user_login_method_metrics(self):
filtered_queryset = self.filter_by_date_range(self.user_login_log_queryset, 'datetime')
backends = set()
data = defaultdict(lambda: defaultdict(set))
for t, username, backend in filtered_queryset.values_list('datetime', 'username', 'backend'):
backends.add(backend)
date_str = str(t.date())
data[date_str][backend].add(username)
metrics = defaultdict(list)
for t in self.date_range_list:
date_str = str(t)
for backend in backends:
username = data.get(date_str) if data.get(date_str) else {backend: set()}
metrics[backend].append(len(username.get(backend, set())))
return metrics
def get_user_login_region_distribution(self):
filtered_queryset = self.filter_by_date_range(self.user_login_log_queryset, 'datetime')
data = filtered_queryset.values('city').annotate(
user_count=Count('username', distinct=True)
).order_by('-user_count')
metrics = [{'name': d['city'], 'value': d['user_count']} for d in data]
return metrics
def get_user_login_time_metrics(self):
time_buckets = {
'00:00-06:00': (0, 6),
'06:00-12:00': (6, 12),
'12:00-18:00': (12, 18),
'18:00-24:00': (18, 24),
}
filtered_queryset = self.filter_by_date_range(self.user_login_log_queryset, 'datetime').all()
metrics = {bucket: 0 for bucket in time_buckets.keys()}
for date in filtered_queryset:
hour = date.datetime.hour
for bucket, (start, end) in time_buckets.items():
if start <= hour < end:
metrics[bucket] = metrics.get(bucket, 0) + 1
return metrics
@lazyproperty
def user_login_log_queryset(self):
queryset = UserLoginLog.objects.filter(status=LoginStatusChoices.success)
return UserLoginLog.filter_queryset_by_org(queryset)
@lazyproperty
def user_login_failed_queryset(self):
queryset = UserLoginLog.objects.filter(status=LoginStatusChoices.failed)
return UserLoginLog.filter_queryset_by_org(queryset)
@lazyproperty
def user_qs(self):
return User.get_org_users()
def get(self, request, *args, **kwargs):
data = {}
user_stats = self.user_qs.aggregate(
total=Count(1),
first_login=Count(1, filter=Q(is_first_login=True)),
need_update_password=Count(1, filter=Q(need_update_password=True)),
face_vector=Count(1, filter=Q(face_vector__isnull=False)),
not_enabled_mfa=Count(1, filter=Q(mfa_level=0)),
)
user_stats['valid'] = sum(1 for u in self.user_qs if u.is_valid)
data['user_stats'] = user_stats
source_map = Source.as_dict()
user_by_source = defaultdict(int)
for source in self.user_qs.values_list('source', flat=True):
k = source_map.get(source, source)
user_by_source[str(k)] += 1
data['user_by_source'] = [{'name': k, 'value': v} for k, v in user_by_source.items()]
data['user_login_log_metrics'] = {
'dates_metrics_date': self.dates_metrics_date,
'dates_metrics_success_total': self.get_user_login_metrics(self.user_login_log_queryset),
'dates_metrics_failure_total': self.get_user_login_metrics(self.user_login_failed_queryset),
}
data['user_login_method_metrics'] = {
'dates_metrics_date': self.dates_metrics_date,
'dates_metrics_total': self.get_user_login_method_metrics(),
}
data['user_login_region_distribution'] = self.get_user_login_region_distribution()
data['user_login_time_metrics'] = self.get_user_login_time_metrics()
return JsonResponse(data, status=200)