# -*- coding: utf-8 -*- # from importlib import import_module from django.conf import settings from django.db.models import F, Value, CharField, Q from django.db.models.functions import Cast from django.http import HttpResponse, FileResponse from django.utils.encoding import escape_uri_path from django_celery_beat.models import PeriodicTask from rest_framework import generics from rest_framework import status from rest_framework import viewsets from rest_framework.decorators import action from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response from common.api import CommonApiMixin from common.const.http import GET, POST from common.drf.filters import DatetimeRangeFilterBackend from common.drf.throttling import FileTransferThrottle from common.permissions import IsServiceAccount from common.plugins.es import QuerySet as ESQuerySet from common.sessions.cache import user_session_manager from common.storage.ftp_file import FTPFileStorageHandler from common.utils import is_uuid, get_logger, lazyproperty from ops.const import Types from ops.models import Job from orgs.mixins.api import OrgReadonlyModelViewSet, OrgModelViewSet from orgs.models import Organization from orgs.utils import current_org, tmp_to_root_org from rbac.permissions import RBACPermission from terminal.models import default_storage from users.models import User from .backends import TYPE_ENGINE_MAPPING from .const import ActivityChoices, ActionChoices from .filters import UserSessionFilterSet, OperateLogFilterSet from .models import ( FTPLog, UserLoginLog, OperateLog, PasswordChangeLog, ActivityLog, JobLog, UserSession, IntegrationApplicationLog ) from .serializers import ( FTPLogSerializer, UserLoginLogSerializer, JobLogSerializer, OperateLogSerializer, OperateLogActionDetailSerializer, PasswordChangeLogSerializer, ActivityUnionLogSerializer, FileSerializer, UserSessionSerializer, JobsAuditSerializer, ServiceAccessLogSerializer, OperateLogFullSerializer ) from .utils import construct_userlogin_usernames, record_operate_log_and_activity_log logger = get_logger(__name__) class JobLogAuditViewSet(OrgReadonlyModelViewSet): model = JobLog extra_filter_backends = [DatetimeRangeFilterBackend] date_range_filter_fields = [ ('date_start', ('date_from', 'date_to')) ] search_fields = ['creator__name', 'material'] filterset_fields = ['creator__name', 'material'] serializer_class = JobLogSerializer ordering = ['-date_start'] class JobsAuditViewSet(OrgModelViewSet): model = Job search_fields = ['creator__name', 'args', 'name'] filterset_fields = ['creator__name', 'args', 'name'] serializer_class = JobsAuditSerializer ordering = ['-is_periodic', '-date_updated'] http_method_names = ['get', 'options', 'patch'] def get_queryset(self): queryset = super().get_queryset() queryset = queryset.exclude(type=Types.upload_file).filter(instant=False) return queryset def perform_update(self, serializer): job = self.get_object() is_periodic = serializer.validated_data.get('is_periodic') if job.is_periodic != is_periodic: job.is_periodic = is_periodic job.save() name, task, args, kwargs = job.get_register_task() task_obj = PeriodicTask.objects.filter(name=name).first() if task_obj: is_periodic = job.is_periodic if task_obj.enabled != is_periodic: task_obj.enabled = is_periodic task_obj.save() return super().perform_update(serializer) class FTPLogViewSet(OrgModelViewSet): model = FTPLog serializer_class = FTPLogSerializer extra_filter_backends = [DatetimeRangeFilterBackend] date_range_filter_fields = [ ('date_start', ('date_from', 'date_to')) ] filterset_fields = ['user', 'asset', 'account', 'filename', 'session'] search_fields = filterset_fields ordering = ['-date_start'] http_method_names = ['post', 'get', 'head', 'options', 'patch'] rbac_perms = { 'download': 'audits.view_ftplog', } def get_storage(self): return FTPFileStorageHandler(self.get_object()) @action( methods=[GET], detail=True, permission_classes=[RBACPermission, ], throttle_classes=[FileTransferThrottle], url_path='file/download' ) def download(self, request, *args, **kwargs): ftp_log = self.get_object() ftp_storage = self.get_storage() local_path, url = ftp_storage.get_file_path_url() if local_path is None: # url => error message return HttpResponse(url) file = open(default_storage.path(local_path), 'rb') response = FileResponse(file) response['Content-Type'] = 'application/octet-stream' filename = escape_uri_path(ftp_log.filename) response["Content-Disposition"] = "attachment; filename*=UTF-8''{}".format(filename) record_operate_log_and_activity_log( [ftp_log.id], ActionChoices.download, '', self.model, resource_display=f'{ftp_log.asset}: {ftp_log.filename}', ) return response @action(methods=[POST], detail=True, permission_classes=[IsServiceAccount, ], throttle_classes=[FileTransferThrottle], serializer_class=FileSerializer) def upload(self, request, *args, **kwargs): ftp_log = self.get_object() serializer = self.get_serializer(data=request.data) if serializer.is_valid(): file = serializer.validated_data['file'] name, err = ftp_log.save_file_to_storage(file) if not name: msg = "Failed save file `{}`: {}".format(ftp_log.id, err) logger.error(msg) return Response({'msg': str(err)}, status=400) url = default_storage.url(name) return Response({'url': url}, status=201) else: msg = 'Upload data invalid: {}'.format(serializer.errors) logger.error(msg) return Response({'msg': serializer.errors}, status=401) class UserLoginCommonMixin: model = UserLoginLog serializer_class = UserLoginLogSerializer extra_filter_backends = [DatetimeRangeFilterBackend] date_range_filter_fields = [ ('datetime', ('date_from', 'date_to')) ] filterset_fields = ['id', 'username', 'ip', 'city', 'type', 'status', 'mfa'] search_fields = ['id', 'username', 'ip', 'city'] class UserLoginLogViewSet(UserLoginCommonMixin, OrgReadonlyModelViewSet): @staticmethod def get_org_member_usernames(): user_queryset = current_org.get_members() users = construct_userlogin_usernames(user_queryset) return users def get_queryset(self): queryset = super().get_queryset() queryset = queryset.model.filter_queryset_by_org(queryset) return queryset class MyLoginLogViewSet(UserLoginCommonMixin, OrgReadonlyModelViewSet): permission_classes = [IsAuthenticated] def get_queryset(self): qs = super().get_queryset() username = self.request.user.username q = Q(username=username) | Q(username__icontains=f'({username})') qs = qs.filter(q) return qs class ResourceActivityAPIView(generics.ListAPIView): serializer_class = ActivityUnionLogSerializer ordering_fields = ['datetime'] rbac_perms = { 'GET': 'audits.view_activitylog', } @staticmethod def get_operate_log_qs(fields, limit, org_q, resource_id=None): q, user = Q(resource_id=resource_id), None if is_uuid(resource_id): user = User.objects.filter(id=resource_id).first() if user is not None: q |= Q(user=str(user)) queryset = OperateLog.objects.filter(q, org_q).annotate( r_type=Value(ActivityChoices.operate_log, CharField()), r_detail_id=Cast(F('id'), CharField()), r_detail=Value(None, CharField()), r_user=F('user'), r_action=F('action'), ).values(*fields)[:limit] return queryset @staticmethod def get_activity_log_qs(fields, limit, org_q, **filters): queryset = ActivityLog.objects.filter(org_q, **filters).annotate( r_type=F('type'), r_detail_id=F('detail_id'), r_detail=F('detail'), r_user=Value(None, CharField()), r_action=Value(None, CharField()), ).values(*fields)[:limit] return queryset def get_queryset(self): limit = 30 resource_id = self.request.query_params.get('resource_id') fields = ( 'id', 'datetime', 'r_detail', 'r_detail_id', 'r_user', 'r_action', 'r_type' ) org_q = Q() if not current_org.is_root(): org_q = Q(org_id=Organization.SYSTEM_ID) | Q(org_id=current_org.id) if resource_id: org_q |= Q(org_id='') | Q(org_id=Organization.ROOT_ID) with tmp_to_root_org(): qs1 = self.get_operate_log_qs(fields, limit, org_q, resource_id=resource_id) qs2 = self.get_activity_log_qs(fields, limit, org_q, resource_id=resource_id) queryset = qs2.union(qs1) return queryset.order_by('-datetime')[:limit] class OperateLogViewSet(OrgReadonlyModelViewSet): model = OperateLog serializer_class = OperateLogSerializer extra_filter_backends = [DatetimeRangeFilterBackend] date_range_filter_fields = [ ('datetime', ('date_from', 'date_to')) ] filterset_class = OperateLogFilterSet search_fields = ['resource', 'user'] ordering = ['-datetime'] @lazyproperty def is_action_detail(self): return self.detail and self.request.query_params.get('type') == 'action_detail' def get_serializer_class(self): if self.is_action_detail: return OperateLogActionDetailSerializer elif self.request.query_params.get('format'): return OperateLogFullSerializer return OperateLogSerializer def get_queryset(self): current_org_id = str(current_org.id) with tmp_to_root_org(): qs = OperateLog.objects.all() if current_org_id != Organization.ROOT_ID: filtered_org_ids = {current_org_id} if current_org_id == Organization.DEFAULT_ID: filtered_org_ids.update(Organization.INTERNAL_IDS) if self.is_action_detail: filtered_org_ids.add(Organization.SYSTEM_ID) qs = OperateLog.objects.filter(org_id__in=filtered_org_ids) es_config = settings.OPERATE_LOG_ELASTICSEARCH_CONFIG if es_config: engine_mod = import_module(TYPE_ENGINE_MAPPING['es']) store = engine_mod.OperateLogStore(es_config) if store.ping(timeout=2): qs = ESQuerySet(store) qs.model = OperateLog return qs class PasswordChangeLogViewSet(OrgReadonlyModelViewSet): model = PasswordChangeLog serializer_class = PasswordChangeLogSerializer extra_filter_backends = [DatetimeRangeFilterBackend] date_range_filter_fields = [ ('datetime', ('date_from', 'date_to')) ] filterset_fields = ['user', 'change_by', 'remote_addr'] search_fields = filterset_fields ordering = ['-datetime'] def get_queryset(self): queryset = super().get_queryset() return self.model.filter_queryset_by_org(queryset) class UserSessionViewSet(CommonApiMixin, viewsets.ModelViewSet): http_method_names = ('get', 'post', 'head', 'options', 'trace') serializer_class = UserSessionSerializer filterset_class = UserSessionFilterSet search_fields = ['id', 'ip', 'city'] rbac_perms = { 'offline': ['audits.offline_usersession'] } @property def org_user_ids(self): user_ids = current_org.get_members().values_list('id', flat=True) return user_ids def get_queryset(self): keys = user_session_manager.get_keys() queryset = UserSession.objects.filter(key__in=keys) if current_org.is_root(): return queryset user_ids = self.org_user_ids queryset = queryset.filter(user_id__in=user_ids) return queryset @action(['POST'], detail=False, url_path='offline') def offline(self, request, *args, **kwargs): ids = request.data.get('ids', []) queryset = self.get_queryset() session_key = request.session.session_key queryset = queryset.exclude(key=session_key).filter(id__in=ids) if not queryset.exists(): return Response(status=status.HTTP_200_OK) keys = queryset.values_list('key', flat=True) for key in keys: user_session_manager.remove(key) queryset.delete() return Response(status=status.HTTP_200_OK) class ServiceAccessLogViewSet(OrgReadonlyModelViewSet): model = IntegrationApplicationLog serializer_class = ServiceAccessLogSerializer extra_filter_backends = [DatetimeRangeFilterBackend] date_range_filter_fields = [ ('datetime', ('date_from', 'date_to')) ] filterset_fields = ['account', 'remote_addr', 'service_id'] search_fields = filterset_fields ordering = ['-datetime']