import csv import datetime import io from collections import OrderedDict, defaultdict from django.db.models import Count, F, Q, Value from django.db.models.functions import Concat from django.http import HttpResponse from django.utils import timezone from django.utils.translation import gettext_lazy as _ from openpyxl import Workbook from rest_framework.request import Request from accounts.const import AutomationTypes, Source as AccountSource from accounts.models import ( Account, AccountTemplate, AutomationExecution, BackupAccountAutomation, ChangeSecretAutomation, CheckAccountAutomation, GatherAccountsAutomation, PushAccountAutomation, ) from assets.const import AllTypes, Category, Connectivity from assets.models import Asset, Platform from audits.const import LoginStatusChoices from audits.models import PasswordChangeLog, UserLoginLog from common.const.choices import Status from common.utils import lazyproperty from common.utils.timezone import local_zero_hour, local_now from terminal.const import LoginFrom from terminal.models import Session from users.models import Source, User class DateRangeMixin: request: Request days_param = 'days' default_days = 1 @lazyproperty def days(self) -> int: raw = self.request.query_params.get(self.days_param, self.default_days) try: return int(raw) except (ValueError, TypeError): return self.default_days @property def start_datetime(self): if self.days == 1: return local_zero_hour() return local_now() - timezone.timedelta(days=self.days) @property def date_range_bounds(self) -> tuple: start = self.start_datetime.date() end = (local_now() + timezone.timedelta(days=1)).date() return start, end @lazyproperty def date_range_list(self) -> list: return [ (local_now() - timezone.timedelta(days=i)).date() for i in range(self.days - 1, -1, -1) ] def filter_by_date_range(self, queryset, field_name: str): start, end = self.date_range_bounds return queryset.filter(**{f'{field_name}__range': (start, end)}) @lazyproperty def dates_metrics_date(self): return [date.strftime('%m-%d') for date in self.date_range_list] or ['0'] DATE_PRESET_DAYS = { 'last_day': 1, 'last_week': 7, 'last_month': 30, 'last_three_months': 90, 'last_half_year': 180, 'last_year': 365, } CREATABLE_REPORT_TYPES = ( 'UserLoginReport', 'UserChangePasswordReport', 'AssetStatistics', 'AssetReport', 'AccountStatistics', 'AccountAutomationReport', ) def parse_date(raw): if not raw: return None if isinstance(raw, datetime.datetime): return timezone.localtime(raw).date() if isinstance(raw, datetime.date): return raw try: return datetime.date.fromisoformat(str(raw)) except (TypeError, ValueError): return None def resolve_range(*, start=None, end=None, days=7, preset=''): if preset: days = DATE_PRESET_DAYS.get(preset, days) start_date = parse_date(start) end_date = parse_date(end) if start_date and end_date and start_date > end_date: start_date, end_date = end_date, start_date if not start_date and not end_date: end_date = timezone.localtime(timezone.now()).date() try: days = max(int(days or 1), 1) except (TypeError, ValueError): days = 1 start_date = end_date - timezone.timedelta(days=max(days - 1, 0)) elif start_date and not end_date: end_date = start_date elif end_date and not start_date: start_date = end_date if (end_date - start_date).days > 366: end_date = start_date + timezone.timedelta(days=366) start_dt = timezone.make_aware( datetime.datetime.combine(start_date, datetime.time.min), timezone.get_current_timezone() ) end_dt = timezone.make_aware( datetime.datetime.combine(end_date + timezone.timedelta(days=1), datetime.time.min), timezone.get_current_timezone() ) date_list = [start_date + timezone.timedelta(days=index) for index in range((end_date - start_date).days + 1)] return { 'start': start_date, 'end': end_date, 'start_datetime': start_dt, 'end_datetime_exclusive': end_dt, 'date_list': date_list, 'date_labels': [item.strftime('%m-%d') for item in date_list] or ['0'], } def filter_by_range(queryset, field_name, range_info): return queryset.filter(**{ f'{field_name}__gte': range_info['start_datetime'], f'{field_name}__lt': range_info['end_datetime_exclusive'], }) def export_table_response(table, export): if export == 'table': return {'columns': table['columns'], 'rows': table['rows']} headers = [column['label'] for column in table['columns']] keys = [column['key'] for column in table['columns']] if export == 'csv': buffer = io.StringIO() writer = csv.writer(buffer) writer.writerow(headers) for row in table['rows']: writer.writerow([row.get(key, '') for key in keys]) content = buffer.getvalue().encode('utf-8-sig') response = HttpResponse(content, content_type='text/csv') else: workbook = Workbook() worksheet = workbook.active worksheet.append(headers) for row in table['rows']: worksheet.append([row.get(key, '') for key in keys]) from openpyxl.writer.excel import save_virtual_workbook content = save_virtual_workbook(workbook) response = HttpResponse( content, content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' ) response['Content-Disposition'] = f'attachment; filename="{table["filename"]}.{export}"' return response def build_user_login_report(filters=None, days=7): range_info = resolve_range(days=days) users = User.get_org_users() success_qs = UserLoginLog.filter_queryset_by_org(UserLoginLog.objects.filter(status=LoginStatusChoices.success)) failed_qs = UserLoginLog.filter_queryset_by_org(UserLoginLog.objects.filter(status=LoginStatusChoices.failed)) success_qs = filter_by_range(success_qs, 'datetime', range_info) failed_qs = filter_by_range(failed_qs, 'datetime', range_info) source_map = Source.as_dict() source_map.update({'password': _('Password')}) source_data = defaultdict(int) for source in users.values_list('source', flat=True): source_data[str(source_map.get(source, source))] += 1 success_data = defaultdict(set) failed_data = defaultdict(set) method_data = defaultdict(lambda: defaultdict(set)) methods = set() for current_time, current_username, backend in success_qs.values_list('datetime', 'username', 'backend'): success_data[str(current_time.date())].add(current_username) backend_name = str(source_map.get(backend.lower(), backend)) method_data[str(current_time.date())][backend_name].add(current_username) methods.add(backend_name) for current_time, current_username in failed_qs.values_list('datetime', 'username'): failed_data[str(current_time.date())].add(current_username) time_buckets = ['00:00-06:00', '06:00-12:00', '12:00-18:00', '18:00-24:00'] time_metrics = {bucket: 0 for bucket in time_buckets} for obj in success_qs.only('datetime'): time_metrics[time_buckets[timezone.localtime(obj.datetime).hour // 6]] += 1 success_metrics = [len(success_data.get(str(item), set())) for item in range_info['date_list']] failed_metrics = [len(failed_data.get(str(item), set())) for item in range_info['date_list']] method_metrics = defaultdict(list) for current_date in range_info['date_list']: current_methods = method_data.get(str(current_date), {}) for method in methods: method_metrics[method].append(len(current_methods.get(method, set()))) rows = [{'date': label, 'success': success_metrics[index], 'failed': failed_metrics[index]} for index, label in enumerate(range_info['date_labels'])] payload = { 'user_stats': users.aggregate( total=Count(1), first_login=Count(1, filter=Q(is_first_login=True)), need_update_password=Count(1, filter=Q(need_update_password=True)), face_vector=Count(1, filter=Q(face_vector__isnull=False)), not_enabled_mfa=Count(1, filter=Q(mfa_level=0)), ), 'user_by_source': [{'name': key, 'value': value} for key, value in source_data.items()], 'user_login_log_metrics': { 'dates_metrics_date': range_info['date_labels'], 'dates_metrics_success_total': success_metrics, 'dates_metrics_failure_total': failed_metrics, }, 'user_login_method_metrics': { 'dates_metrics_date': range_info['date_labels'], 'dates_metrics_total': method_metrics, }, 'user_login_time_metrics': time_metrics, } payload['user_stats']['valid'] = sum(1 for user in users if user.is_valid) table = { 'filename': 'user_login_report', 'columns': [ {'key': 'date', 'label': 'date'}, {'key': 'success', 'label': 'success'}, {'key': 'failed', 'label': 'failed'}, ], 'rows': rows, } return payload, table, range_info def build_user_change_password_report(filters=None, days=7): range_info = resolve_range(days=days) queryset = PasswordChangeLog.filter_queryset_by_org(PasswordChangeLog.objects.all()) queryset = filter_by_range(queryset, 'datetime', range_info) metric_data = defaultdict(set) for current_time, current_user in queryset.values_list('datetime', 'user'): metric_data[str(current_time.date())].add(current_user) metrics = [len(metric_data.get(str(item), set())) for item in range_info['date_list']] rows = [{'date': label, 'count': metrics[index]} for index, label in enumerate(range_info['date_labels'])] payload = { 'total_count_change_password': { 'total': queryset.count(), 'user_total': queryset.values('user').distinct().count(), 'change_by_total': queryset.values('change_by').distinct().count(), }, 'change_password_top10_users': list(queryset.values('user').annotate(count=Count('id')).order_by('-count')[:10]), 'change_password_top10_change_bys': list(queryset.values('change_by').annotate(count=Count('id')).order_by('-count')[:10]), 'user_change_password_metrics': { 'dates_metrics_date': range_info['date_labels'], 'dates_metrics_total': metrics, }, } table = { 'filename': 'user_change_password_report', 'columns': [ {'key': 'date', 'label': 'date'}, {'key': 'count', 'label': 'count'}, ], 'rows': rows, } return payload, table, range_info def build_asset_activity_report(filters=None, days=7): range_info = resolve_range(days=days) from reports.api.assets.base import group_stats queryset = Session.objects.all() queryset = filter_by_range(queryset, 'date_start', range_info) metric_data = defaultdict(set) for current_time, session_id in queryset.values_list('date_start', 'id'): metric_data[str(current_time.date())].add(session_id) metrics = [len(metric_data.get(str(item), set())) for item in range_info['date_list']] active_users_metric = defaultdict(set) active_assets_metric = defaultdict(set) for current_time, user_id, current_asset_id in queryset.values_list('date_start', 'user_id', 'asset_id'): report_date = str(current_time.date()) if user_id: active_users_metric[report_date].add(user_id) if current_asset_id: active_assets_metric[report_date].add(current_asset_id) rows = [{'date': label, 'count': metrics[index]} for index, label in enumerate(range_info['date_labels'])] asset_ids = {str(asset_value) for asset_value in queryset.values_list('asset_id', flat=True).distinct()} assets = Asset.objects.filter(id__in=asset_ids) payload = { 'session_stats': queryset.aggregate( total=Count(1), asset_count=Count('asset_id', distinct=True), user_count=Count('user_id', distinct=True), ), 'asset_login_by_type': group_stats(assets, 'label', 'platform__type', dict(AllTypes.choices())), 'asset_login_by_from': group_stats(queryset, 'label', 'login_from', LoginFrom.as_dict()), 'asset_login_by_protocol': group_stats(queryset, 'label', 'protocol'), 'asset_login_log_metrics': { 'dates_metrics_date': range_info['date_labels'], 'dates_metrics_total': metrics, }, 'user_asset_activity_metrics': { 'dates_metrics_date': range_info['date_labels'], 'dates_metrics_total_count_active_users': [ len(active_users_metric.get(str(item), set())) for item in range_info['date_list'] ], 'dates_metrics_total_count_active_assets': [ len(active_assets_metric.get(str(item), set())) for item in range_info['date_list'] ], }, } table = { 'filename': 'asset_activity_report', 'columns': [ {'key': 'date', 'label': 'date'}, {'key': 'count', 'label': 'count'}, ], 'rows': rows, } return payload, table, range_info def build_account_automation_report(filters=None, days=7): range_info = resolve_range(days=days) querysets = { 'push': PushAccountAutomation.objects.all(), 'check': CheckAccountAutomation.objects.all(), 'backup': BackupAccountAutomation.objects.all(), 'collect': GatherAccountsAutomation.objects.all(), 'change_secret': ChangeSecretAutomation.objects.all(), } automation_ids = [] for queryset in querysets.values(): automation_ids.extend(list(queryset.values_list('id', flat=True))) execution_queryset = AutomationExecution.objects.filter(type__in=( AutomationTypes.push_account, AutomationTypes.check_account, AutomationTypes.backup_account, AutomationTypes.gather_accounts, AutomationTypes.change_secret, )) execution_queryset = execution_queryset.filter(automation_id__in=automation_ids) execution_queryset = filter_by_range(execution_queryset, 'date_start', range_info) grouped = defaultdict(lambda: defaultdict(int)) for execution in execution_queryset: grouped[str(timezone.localtime(execution.date_start).date())][execution.type] += 1 label_map = { AutomationTypes.push_account: str(AutomationTypes.push_account.label), AutomationTypes.check_account: str(AutomationTypes.check_account.label), AutomationTypes.backup_account: str(AutomationTypes.backup_account.label), AutomationTypes.gather_accounts: str(AutomationTypes.gather_accounts.label), AutomationTypes.change_secret: str(_('Account change secret')), } metrics = {} rows = [] for report_date, label in zip(range_info['date_list'], range_info['date_labels']): row = {'date': label} current = grouped.get(str(report_date), {}) for item in label_map: value = current.get(item, 0) metrics.setdefault(label_map[item], []).append(value) row[item] = value rows.append(row) payload = { 'automation_stats': {key: queryset.count() for key, queryset in querysets.items()}, 'execution_metrics': { 'dates_metrics_date': range_info['date_labels'], 'data': metrics, }, 'account_result_metrics': { 'dates_metrics_date': range_info['date_labels'], 'dates_metrics_total_count_success': [], 'dates_metrics_total_count_failed': [], }, } change_secret_queryset = execution_queryset.filter(type=AutomationTypes.change_secret) success_grouped = defaultdict(int) failed_grouped = defaultdict(int) for execution in change_secret_queryset: report_date = str(timezone.localtime(execution.date_start).date()) if execution.status == Status.success: success_grouped[report_date] += 1 elif execution.status == Status.failed: failed_grouped[report_date] += 1 payload['account_result_metrics']['dates_metrics_total_count_success'] = [ success_grouped.get(str(item), 0) for item in range_info['date_list'] ] payload['account_result_metrics']['dates_metrics_total_count_failed'] = [ failed_grouped.get(str(item), 0) for item in range_info['date_list'] ] table = { 'filename': 'account_automation_report', 'columns': [ {'key': 'date', 'label': 'date'}, {'key': AutomationTypes.push_account, 'label': label_map[AutomationTypes.push_account]}, {'key': AutomationTypes.check_account, 'label': label_map[AutomationTypes.check_account]}, {'key': AutomationTypes.backup_account, 'label': label_map[AutomationTypes.backup_account]}, {'key': AutomationTypes.gather_accounts, 'label': label_map[AutomationTypes.gather_accounts]}, {'key': AutomationTypes.change_secret, 'label': label_map[AutomationTypes.change_secret]}, ], 'rows': rows, } return payload, table, range_info def build_asset_statistics_report(filters=None, days=7): range_info = resolve_range(days=days) qs = Asset.objects.all() all_type_dict = dict(AllTypes.choices()) stats = qs.aggregate( total=Count(1), active=Count(1, filter=Q(is_active=True)), connected=Count(1, filter=Q(connectivity=Connectivity.OK)), zone=Count(1, filter=Q(zone__isnull=False)), directory_services=Count(1, filter=Q(directory_services__isnull=False)), ) category_map = Category.as_dict() category_type_map = defaultdict(list) for item in AllTypes.types(): category_type_map[str(item['category'].label)].append(item['value']) category_type_ids = defaultdict(lambda: defaultdict(set)) values = qs.select_related('platform').values_list('id', 'platform__type', 'platform__category') for obj_id, platform_type, category in values: category_label = category_map.get(category, category) category_type_ids[category_label][platform_type].add(obj_id) by_type_category = defaultdict(list) for category_label, type_map in category_type_ids.items(): by_type_category[category_label] = [ { 'label': all_type_dict.get(platform_type, platform_type), 'type': platform_type, 'total': len(ids), } for platform_type, ids in type_map.items() ] sorted_category_assets = OrderedDict() desired_order = [str(item['label']) for item in AllTypes.categories()] for category_label in desired_order: sorted_category_assets[category_label] = by_type_category.get(category_label, []) filtered_created = filter_by_range(qs, 'date_created', range_info) grouped_created = defaultdict(set) for created_at, obj_id in filtered_created.values_list('date_created', 'id'): grouped_created[str(created_at.date())].add(obj_id) stats.update({ 'platform_count': Platform.objects.count(), }) payload = { 'asset_stats': stats, 'assets_by_type_category': sorted_category_assets, 'added_asset_metrics': { 'dates_metrics_date': range_info['date_labels'], 'dates_metrics_total': [len(grouped_created.get(str(item), set())) for item in range_info['date_list']], } } table = { 'filename': 'asset_statistics_report', 'columns': [ {'key': 'metric', 'label': 'metric'}, {'key': 'value', 'label': 'value'}, ], 'rows': [ {'metric': key, 'value': value} for key, value in stats.items() ], } return payload, table, range_info def build_account_statistics_report(filters=None, days=30): range_info = resolve_range(days=days) qs = Account.objects.all() stats = qs.aggregate( total=Count(1), active=Count(1, filter=Q(is_active=True)), connected=Count(1, filter=Q(connectivity=Connectivity.OK)), su_from=Count(1, filter=Q(su_from__isnull=False)), date_change_secret=Count(1, filter=Q(secret_reset=True)), ) stats['template_total'] = AccountTemplate.objects.count() source_pie_data = [ {'name': str(AccountSource(source).label), 'value': total} for source, total in qs.values('source').annotate(total=Count(1)).values_list('source', 'total') ] by_connectivity = ( qs.exclude(connectivity__isnull=True) .values('connectivity') .annotate(total=Count(1)) .order_by('connectivity') ) connectivity_data = [ { 'label': Connectivity.as_dict().get(item['connectivity'], item['connectivity']), 'total': item['total'], 'connectivity': item['connectivity'], } for item in by_connectivity ] top_assets = list( qs.values('asset__name') .annotate(account_count=Count('id')) .order_by('-account_count')[:10] ) top_version_accounts = list( qs.annotate( display_key=Concat( F('asset__name'), Value('('), F('username'), Value(')') ) ).values('display_key', 'version').order_by('-version')[:10] ) filtered_changed = filter_by_range(qs.exclude(date_change_secret__isnull=True), 'date_change_secret', range_info) grouped_changed = defaultdict(set) for changed_at, obj_id in filtered_changed.values_list('date_change_secret', 'id'): grouped_changed[str(timezone.localtime(changed_at).date())].add(obj_id) payload = { 'account_stats': stats, 'top_assets': top_assets, 'top_version_accounts': top_version_accounts, 'source_pie': source_pie_data, 'by_connectivity': connectivity_data, 'change_secret_account_metrics': { 'dates_metrics_date': range_info['date_labels'], 'dates_metrics_total': [len(grouped_changed.get(str(item), set())) for item in range_info['date_list']], } } table = { 'filename': 'account_statistics_report', 'columns': [ {'key': 'metric', 'label': 'metric'}, {'key': 'value', 'label': 'value'}, ], 'rows': [ {'metric': key, 'value': value} for key, value in stats.items() ], } return payload, table, range_info def build_report_content(report_type, filters=None, days=7): if report_type == 'UserLoginReport': return build_user_login_report(filters=filters, days=days) if report_type == 'UserChangePasswordReport': return build_user_change_password_report(filters=filters, days=days) if report_type == 'AssetStatistics': return build_asset_statistics_report(filters=filters, days=days) if report_type == 'AssetReport': return build_asset_activity_report(filters=filters, days=days) if report_type == 'AccountStatistics': return build_account_statistics_report(filters=filters, days=days) if report_type == 'AccountAutomationReport': return build_account_automation_report(filters=filters, days=days) raise ValueError(f'Unsupported report type: {report_type}')