mirror of
https://github.com/jumpserver/jumpserver.git
synced 2026-04-26 09:32:06 +00:00
* [Feature] 1. 资产用户管理器 * [Feature] 2. 资产用户管理器: 更新AuthBook * [Feature] 3. 资产用户管理器: 添加 AssetUser API * [Feature] 4. AssetUser Model: 添加方法 load_related_asset_auth * [Feature] 5. AdminUser: 更新管理用户获取认证信息时,先加载相关资产的认证 * [Feature] 6. SystemUser: 更新系统用户获取认证信息时,先加载相关资产的认证 * [Feature] 前端页面: 添加资产用户列表页面 * [Feature] 前端页面: 管理用户的资产管理页面添加按钮: 修改资产用户认证信息 * [Feature] 前端页面: 系统用户的资产管理页面添加按钮: 修改资产用户认证信息 * [Feature] 优化: 从管理用户和系统用户的backend中获取相关资产用户的逻辑 * [Update] Fix 1 * [Feature] 优化: SystemUserBackend之filter功能 * [Feature] 优化: AdminUserBackend之filter功能 * [Feature] 优化: AdminUserBackend和SystemUserBackend功能 * [Feature] 更新翻译: 资产用户管理器 * [Update] 更新资产用户列表页名称为: asset_asset_user_list.html * [Bugfix] 修改bug: SystemUserBackend 根据用户名过滤系统用户 * [Feature] 添加: 资产用户列表中可测试资产用户的连接性 * [Update] 修改: AdHoc model的run_as字段从SystemUser外键修改为username字符串 * [Feature] 添加: 获取系统用户认证信息(对应某个资产)API * [Update] 更新: API获取asset user时进行排序 * [Bugfix] 修改: 资产用户可连接性CACHE_KEY * [Update] 更新翻译信息 * [Update] 修改获取资产用户认证信息API的返回响应(200/400) * [Update] 修改BaseUser获取特定资产的方法名 * [Update] 修改logger输出,AuthBook set_version_and_latest * [Update] 修改日志输出添加exc_info参数 * [Update] 移除AuthBook迁移文件0026 * [Bugfix] 修复AdminUserBackend获取instances为空的bug
102 lines
3.2 KiB
Python
102 lines
3.2 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
|
|
|
|
from rest_framework.response import Response
|
|
from rest_framework import viewsets, status, generics
|
|
from rest_framework.pagination import LimitOffsetPagination
|
|
|
|
from common.permissions import IsOrgAdminOrAppUser
|
|
from common.utils import get_object_or_none, get_logger
|
|
|
|
from ..backends.multi import AssetUserManager
|
|
from ..models import Asset
|
|
from .. import serializers
|
|
from ..tasks import test_asset_users_connectivity_manual
|
|
|
|
|
|
__all__ = [
|
|
'AssetUserViewSet', 'AssetUserAuthInfoApi', 'AssetUserTestConnectiveApi',
|
|
]
|
|
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class AssetUserViewSet(viewsets.GenericViewSet):
|
|
pagination_class = LimitOffsetPagination
|
|
serializer_class = serializers.AssetUserSerializer
|
|
permission_classes = (IsOrgAdminOrAppUser, )
|
|
http_method_names = ['get', 'post']
|
|
|
|
def create(self, request, *args, **kwargs):
|
|
serializer = self.get_serializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
serializer.save()
|
|
return Response(serializer.data, status=status.HTTP_201_CREATED)
|
|
|
|
def list(self, request, *args, **kwargs):
|
|
queryset = self.filter_queryset(self.get_queryset())
|
|
serializer = self.get_serializer(queryset, many=True)
|
|
return Response(serializer.data)
|
|
|
|
def get_queryset(self):
|
|
username = self.request.GET.get('username')
|
|
asset_id = self.request.GET.get('asset_id')
|
|
asset = get_object_or_none(Asset, pk=asset_id)
|
|
queryset = AssetUserManager.filter(username=username, asset=asset)
|
|
return queryset
|
|
|
|
def filter_queryset(self, queryset):
|
|
queryset = sorted(
|
|
queryset,
|
|
key=lambda q: (q.asset.hostname, q.connectivity, q.username)
|
|
)
|
|
return queryset
|
|
|
|
|
|
class AssetUserAuthInfoApi(generics.RetrieveAPIView):
|
|
serializer_class = serializers.AssetUserAuthInfoSerializer
|
|
permission_classes = (IsOrgAdminOrAppUser,)
|
|
|
|
def retrieve(self, request, *args, **kwargs):
|
|
instance = self.get_object()
|
|
serializer = self.get_serializer(instance)
|
|
status_code = status.HTTP_200_OK
|
|
if not instance:
|
|
status_code = status.HTTP_400_BAD_REQUEST
|
|
return Response(serializer.data, status=status_code)
|
|
|
|
def get_object(self):
|
|
username = self.request.GET.get('username')
|
|
asset_id = self.request.GET.get('asset_id')
|
|
asset = get_object_or_none(Asset, pk=asset_id)
|
|
try:
|
|
instance = AssetUserManager.get(username, asset)
|
|
except Exception as e:
|
|
logger.error(e, exc_info=True)
|
|
return None
|
|
else:
|
|
return instance
|
|
|
|
|
|
class AssetUserTestConnectiveApi(generics.RetrieveAPIView):
|
|
"""
|
|
Test asset users connective
|
|
"""
|
|
|
|
def get_asset_users(self):
|
|
username = self.request.GET.get('username')
|
|
asset_id = self.request.GET.get('asset_id')
|
|
asset = get_object_or_none(Asset, pk=asset_id)
|
|
asset_users = AssetUserManager.filter(username=username, asset=asset)
|
|
return asset_users
|
|
|
|
def retrieve(self, request, *args, **kwargs):
|
|
asset_users = self.get_asset_users()
|
|
task = test_asset_users_connectivity_manual.delay(asset_users)
|
|
return Response({"task": task.id})
|
|
|
|
|
|
|