kubelet podresources: extend List to support Dynamic Resources and implement Get API

Signed-off-by: Moshe Levi <moshele@nvidia.com>
This commit is contained in:
Moshe Levi 2023-03-14 01:34:54 +02:00
parent 9c57613912
commit 2a568bcfc8
14 changed files with 1559 additions and 67 deletions

View File

@ -29,20 +29,22 @@ import (
// v1PodResourcesServer implements PodResourcesListerServer
type v1PodResourcesServer struct {
podsProvider PodsProvider
devicesProvider DevicesProvider
cpusProvider CPUsProvider
memoryProvider MemoryProvider
podsProvider PodsProvider
devicesProvider DevicesProvider
cpusProvider CPUsProvider
memoryProvider MemoryProvider
dynamicResourcesProvider DynamicResourcesProvider
}
// NewV1PodResourcesServer returns a PodResourcesListerServer which lists pods provided by the PodsProvider
// with device information provided by the DevicesProvider
func NewV1PodResourcesServer(providers PodResourcesProviders) v1.PodResourcesListerServer {
return &v1PodResourcesServer{
podsProvider: providers.Pods,
devicesProvider: providers.Devices,
cpusProvider: providers.Cpus,
memoryProvider: providers.Memory,
podsProvider: providers.Pods,
devicesProvider: providers.Devices,
cpusProvider: providers.Cpus,
memoryProvider: providers.Memory,
dynamicResourcesProvider: providers.DynamicResources,
}
}
@ -69,6 +71,10 @@ func (p *v1PodResourcesServer) List(ctx context.Context, req *v1.ListPodResource
CpuIds: p.cpusProvider.GetCPUs(string(pod.UID), container.Name),
Memory: p.memoryProvider.GetMemory(string(pod.UID), container.Name),
}
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.KubeletPodResourcesDynamicResources) {
pRes.Containers[j].DynamicResources = p.dynamicResourcesProvider.GetDynamicResources(pod, &container)
}
}
podResources[i] = &pRes
}
@ -85,7 +91,7 @@ func (p *v1PodResourcesServer) GetAllocatableResources(ctx context.Context, req
if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.KubeletPodResourcesGetAllocatable) {
metrics.PodResourcesEndpointErrorsGetAllocatableCount.WithLabelValues("v1").Inc()
return nil, fmt.Errorf("Pod Resources API GetAllocatableResources disabled")
return nil, fmt.Errorf("PodResources API GetAllocatableResources disabled")
}
return &v1.AllocatableResourcesResponse{
@ -94,3 +100,43 @@ func (p *v1PodResourcesServer) GetAllocatableResources(ctx context.Context, req
Memory: p.memoryProvider.GetAllocatableMemory(),
}, nil
}
// Get returns information about the resources assigned to a specific pod
func (p *v1PodResourcesServer) Get(ctx context.Context, req *v1.GetPodResourcesRequest) (*v1.GetPodResourcesResponse, error) {
metrics.PodResourcesEndpointRequestsTotalCount.WithLabelValues("v1").Inc()
metrics.PodResourcesEndpointRequestsGetCount.WithLabelValues("v1").Inc()
if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.KubeletPodResourcesGet) {
metrics.PodResourcesEndpointErrorsGetCount.WithLabelValues("v1").Inc()
return nil, fmt.Errorf("PodResources API Get method disabled")
}
pod, exist := p.podsProvider.GetPodByName(req.PodNamespace, req.PodName)
if !exist {
metrics.PodResourcesEndpointErrorsGetCount.WithLabelValues("v1").Inc()
return nil, fmt.Errorf("pod %s in namespace %s not found", req.PodName, req.PodNamespace)
}
podResources := &v1.PodResources{
Name: pod.Name,
Namespace: pod.Namespace,
Containers: make([]*v1.ContainerResources, len(pod.Spec.Containers)),
}
for i, container := range pod.Spec.Containers {
podResources.Containers[i] = &v1.ContainerResources{
Name: container.Name,
Devices: p.devicesProvider.GetDevices(string(pod.UID), container.Name),
CpuIds: p.cpusProvider.GetCPUs(string(pod.UID), container.Name),
Memory: p.memoryProvider.GetMemory(string(pod.UID), container.Name),
}
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.KubeletPodResourcesDynamicResources) {
podResources.Containers[i].DynamicResources = p.dynamicResourcesProvider.GetDynamicResources(pod, &container)
}
}
response := &v1.GetPodResourcesResponse{
PodResources: podResources,
}
return response, nil
}

View File

@ -114,6 +114,21 @@ func (m *MockPodsProvider) EXPECT() *MockPodsProviderMockRecorder {
return m.recorder
}
// GetPodByName mocks base method.
func (m *MockPodsProvider) GetPodByName(namespace, name string) (*v1.Pod, bool) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetPodByName", namespace, name)
ret0, _ := ret[0].(*v1.Pod)
ret1, _ := ret[1].(bool)
return ret0, ret1
}
// GetPodByName indicates an expected call of GetPodByName.
func (mr *MockPodsProviderMockRecorder) GetPodByName(namespace, name interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodByName", reflect.TypeOf((*MockPodsProvider)(nil).GetPodByName), namespace, name)
}
// GetPods mocks base method.
func (m *MockPodsProvider) GetPods() []*v1.Pod {
m.ctrl.T.Helper()
@ -229,3 +244,40 @@ func (mr *MockMemoryProviderMockRecorder) GetMemory(podUID, containerName interf
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMemory", reflect.TypeOf((*MockMemoryProvider)(nil).GetMemory), podUID, containerName)
}
// MockDynamicResourcesProvider is a mock of DynamicResourcesProvider interface.
type MockDynamicResourcesProvider struct {
ctrl *gomock.Controller
recorder *MockDynamicResourcesProviderMockRecorder
}
// MockDynamicResourcesProviderMockRecorder is the mock recorder for MockDynamicResourcesProvider.
type MockDynamicResourcesProviderMockRecorder struct {
mock *MockDynamicResourcesProvider
}
// NewMockDynamicResourcesProvider creates a new mock instance.
func NewMockDynamicResourcesProvider(ctrl *gomock.Controller) *MockDynamicResourcesProvider {
mock := &MockDynamicResourcesProvider{ctrl: ctrl}
mock.recorder = &MockDynamicResourcesProviderMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockDynamicResourcesProvider) EXPECT() *MockDynamicResourcesProviderMockRecorder {
return m.recorder
}
// GetDynamicResources mocks base method.
func (m *MockDynamicResourcesProvider) GetDynamicResources(pod *v1.Pod, container *v1.Container) []*v10.DynamicResource {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetDynamicResources", pod, container)
ret0, _ := ret[0].([]*v10.DynamicResource)
return ret0
}
// GetDynamicResources indicates an expected call of GetDynamicResources.
func (mr *MockDynamicResourcesProviderMockRecorder) GetDynamicResources(pod, container interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDynamicResources", reflect.TypeOf((*MockDynamicResourcesProvider)(nil).GetDynamicResources), pod, container)
}

View File

@ -35,6 +35,7 @@ type DevicesProvider interface {
// PodsProvider knows how to provide the pods admitted by the node
type PodsProvider interface {
GetPods() []*v1.Pod
GetPodByName(namespace, name string) (*v1.Pod, bool)
}
// CPUsProvider knows how to provide the cpus used by the given container
@ -52,9 +53,15 @@ type MemoryProvider interface {
GetAllocatableMemory() []*podresourcesapi.ContainerMemory
}
type PodResourcesProviders struct {
Pods PodsProvider
Devices DevicesProvider
Cpus CPUsProvider
Memory MemoryProvider
type DynamicResourcesProvider interface {
// GetDynamicResources returns information about dynamic resources assigned to pods and containers
GetDynamicResources(pod *v1.Pod, container *v1.Container) []*podresourcesapi.DynamicResource
}
type PodResourcesProviders struct {
Pods PodsProvider
Devices DevicesProvider
Cpus CPUsProvider
Memory MemoryProvider
DynamicResources DynamicResourcesProvider
}

View File

@ -127,10 +127,11 @@ type ContainerManager interface {
// might need to unprepare resources.
PodMightNeedToUnprepareResources(UID types.UID) bool
// Implements the podresources Provider API for CPUs, Memory and Devices
// Implements the PodResources Provider API
podresources.CPUsProvider
podresources.DevicesProvider
podresources.MemoryProvider
podresources.DynamicResourcesProvider
}
type NodeConfig struct {

View File

@ -965,6 +965,41 @@ func (cm *containerManagerImpl) GetAllocatableMemory() []*podresourcesapi.Contai
return containerMemoryFromBlock(cm.memoryManager.GetAllocatableMemory())
}
func (cm *containerManagerImpl) GetDynamicResources(pod *v1.Pod, container *v1.Container) []*podresourcesapi.DynamicResource {
if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
return []*podresourcesapi.DynamicResource{}
}
var containerDynamicResources []*podresourcesapi.DynamicResource
containerClaimInfos, err := cm.draManager.GetContainerClaimInfos(pod, container)
if err != nil {
klog.ErrorS(err, "Unable to get container claim info state")
return []*podresourcesapi.DynamicResource{}
}
for _, containerClaimInfo := range containerClaimInfos {
var claimResources []*podresourcesapi.ClaimResource
// TODO: Currently we maintain a list of ClaimResources, each of which contains
// a set of CDIDevices from a different kubelet plugin. In the future we may want to
// include the name of the kubelet plugin and/or other types of resources that are
// not CDIDevices (assuming the DRAmanager supports this).
for _, klPluginCdiDevices := range containerClaimInfo.CDIDevices {
var cdiDevices []*podresourcesapi.CDIDevice
for _, cdiDevice := range klPluginCdiDevices {
cdiDevices = append(cdiDevices, &podresourcesapi.CDIDevice{Name: cdiDevice})
}
claimResources = append(claimResources, &podresourcesapi.ClaimResource{CDIDevices: cdiDevices})
}
containerDynamicResource := podresourcesapi.DynamicResource{
ClassName: containerClaimInfo.ClassName,
ClaimName: containerClaimInfo.ClaimName,
ClaimNamespace: containerClaimInfo.Namespace,
ClaimResources: claimResources,
}
containerDynamicResources = append(containerDynamicResources, &containerDynamicResource)
}
return containerDynamicResources
}
func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
return cm.deviceManager.ShouldResetExtendedResourceCapacity()
}

View File

@ -159,6 +159,10 @@ func (cm *containerManagerStub) GetAllocatableMemory() []*podresourcesapi.Contai
return nil
}
func (cm *containerManagerStub) GetDynamicResources(pod *v1.Pod, container *v1.Container) []*podresourcesapi.DynamicResource {
return nil
}
func (cm *containerManagerStub) GetNodeAllocatableAbsolute() v1.ResourceList {
return nil
}

View File

@ -253,6 +253,10 @@ func (cm *containerManagerImpl) GetNodeAllocatableAbsolute() v1.ResourceList {
return nil
}
func (cm *containerManagerImpl) GetDynamicResources(pod *v1.Pod, container *v1.Container) []*podresourcesapi.DynamicResource {
return nil
}
func (cm *containerManagerImpl) PrepareDynamicResources(pod *v1.Pod) error {
return nil
}

View File

@ -26,9 +26,9 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
// claimInfo holds information required
// ClaimInfo holds information required
// to prepare and unprepare a resource claim.
type claimInfo struct {
type ClaimInfo struct {
sync.RWMutex
state.ClaimInfoState
// annotations is a list of container annotations associated with
@ -36,14 +36,14 @@ type claimInfo struct {
annotations []kubecontainer.Annotation
}
func (res *claimInfo) addPodReference(podUID types.UID) {
func (res *ClaimInfo) addPodReference(podUID types.UID) {
res.Lock()
defer res.Unlock()
res.PodUIDs.Insert(string(podUID))
}
func (res *claimInfo) deletePodReference(podUID types.UID) {
func (res *ClaimInfo) deletePodReference(podUID types.UID) {
res.Lock()
defer res.Unlock()
@ -54,10 +54,10 @@ func (res *claimInfo) deletePodReference(podUID types.UID) {
type claimInfoCache struct {
sync.RWMutex
state state.CheckpointState
claimInfo map[string]*claimInfo
claimInfo map[string]*ClaimInfo
}
func newClaimInfo(driverName, className string, claimUID types.UID, claimName, namespace string, podUIDs sets.Set[string]) *claimInfo {
func newClaimInfo(driverName, className string, claimUID types.UID, claimName, namespace string, podUIDs sets.Set[string]) *ClaimInfo {
claimInfoState := state.ClaimInfoState{
DriverName: driverName,
ClassName: className,
@ -66,13 +66,13 @@ func newClaimInfo(driverName, className string, claimUID types.UID, claimName, n
Namespace: namespace,
PodUIDs: podUIDs,
}
claimInfo := claimInfo{
claimInfo := ClaimInfo{
ClaimInfoState: claimInfoState,
}
return &claimInfo
}
func (info *claimInfo) addCDIDevices(pluginName string, cdiDevices []string) error {
func (info *ClaimInfo) addCDIDevices(pluginName string, cdiDevices []string) error {
// NOTE: Passing CDI device names as annotations is a temporary solution
// It will be removed after all runtimes are updated
// to get CDI device names from the ContainerConfig.CDIDevices field
@ -105,7 +105,7 @@ func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error)
cache := &claimInfoCache{
state: stateImpl,
claimInfo: make(map[string]*claimInfo),
claimInfo: make(map[string]*ClaimInfo),
}
for _, entry := range curState {
@ -129,14 +129,14 @@ func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error)
return cache, nil
}
func (cache *claimInfoCache) add(res *claimInfo) {
func (cache *claimInfoCache) add(res *ClaimInfo) {
cache.Lock()
defer cache.Unlock()
cache.claimInfo[res.ClaimName+res.Namespace] = res
}
func (cache *claimInfoCache) get(claimName, namespace string) *claimInfo {
func (cache *claimInfoCache) get(claimName, namespace string) *ClaimInfo {
cache.RLock()
defer cache.RUnlock()

View File

@ -289,3 +289,24 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
func (m *ManagerImpl) PodMightNeedToUnprepareResources(UID types.UID) bool {
return m.cache.hasPodReference(UID)
}
// GetCongtainerClaimInfos gets Container's ClaimInfo
func (m *ManagerImpl) GetContainerClaimInfos(pod *v1.Pod, container *v1.Container) ([]*ClaimInfo, error) {
claimInfos := make([]*ClaimInfo, 0, len(pod.Spec.ResourceClaims))
for i, podResourceClaim := range pod.Spec.ResourceClaims {
claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
for _, claim := range container.Resources.Claims {
if podResourceClaim.Name != claim.Name {
continue
}
claimInfo := m.cache.get(claimName, pod.Namespace)
if claimInfo == nil {
return nil, fmt.Errorf("unable to get resource for namespace: %s, claim: %s", pod.Namespace, claimName)
}
claimInfos = append(claimInfos, claimInfo)
}
}
return claimInfos, nil
}

View File

@ -38,6 +38,9 @@ type Manager interface {
// PodMightNeedToUnprepareResources returns true if the pod with the given UID
// might need to unprepare resources.
PodMightNeedToUnprepareResources(UID types.UID) bool
// GetContainerClaimInfos gets Container ClaimInfo objects
GetContainerClaimInfos(pod *v1.Pod, container *v1.Container) ([]*ClaimInfo, error)
}
// ContainerInfo contains information required by the runtime to consume prepared resources.

View File

@ -232,6 +232,10 @@ func (cm *FakeContainerManager) GetAllocatableMemory() []*podresourcesapi.Contai
return nil
}
func (cm *FakeContainerManager) GetDynamicResources(pod *v1.Pod, container *v1.Container) []*podresourcesapi.DynamicResource {
return nil
}
func (cm *FakeContainerManager) GetNodeAllocatableAbsolute() v1.ResourceList {
cm.Lock()
defer cm.Unlock()

View File

@ -2753,10 +2753,11 @@ func (kl *Kubelet) ListenAndServePodResources() {
}
providers := podresources.PodResourcesProviders{
Pods: kl.podManager,
Devices: kl.containerManager,
Cpus: kl.containerManager,
Memory: kl.containerManager,
Pods: kl.podManager,
Devices: kl.containerManager,
Cpus: kl.containerManager,
Memory: kl.containerManager,
DynamicResources: kl.containerManager,
}
server.ListenAndServePodResources(socket, providers)

File diff suppressed because it is too large Load Diff

View File

@ -20,6 +20,7 @@ option (gogoproto.goproto_unrecognized_all) = false;
service PodResourcesLister {
rpc List(ListPodResourcesRequest) returns (ListPodResourcesResponse) {}
rpc GetAllocatableResources(AllocatableResourcesRequest) returns (AllocatableResourcesResponse) {}
rpc Get(GetPodResourcesRequest) returns (GetPodResourcesResponse) {}
}
message AllocatableResourcesRequest {}
@ -52,6 +53,7 @@ message ContainerResources {
repeated ContainerDevices devices = 2;
repeated int64 cpu_ids = 3;
repeated ContainerMemory memory = 4;
repeated DynamicResource dynamic_resources = 5;
}
// ContainerMemory contains information about memory and hugepages assigned to a container
@ -77,3 +79,36 @@ message TopologyInfo {
message NUMANode {
int64 ID = 1;
}
// DynamicResource contains information about the devices assigned to a container by DRA
message DynamicResource {
string class_name = 1;
string claim_name = 2;
string claim_namespace = 3;
repeated ClaimResource claim_resources = 4;
}
// ClaimResource contains per plugin resource information
message ClaimResource {
repeated CDIDevice cdi_devices = 1 [(gogoproto.customname) = "CDIDevices"];
}
// CDIDevice specifies a CDI device information
message CDIDevice {
// Fully qualified CDI device name
// for example: vendor.com/gpu=gpudevice1
// see more details in the CDI specification:
// https://github.com/container-orchestrated-devices/container-device-interface/blob/main/SPEC.md
string name = 1;
}
// GetPodResourcesRequest contains information about the pod
message GetPodResourcesRequest {
string pod_name = 1;
string pod_namespace = 2;
}
// GetPodResourcesResponse contains information about the pod the devices
message GetPodResourcesResponse {
PodResources pod_resources = 1;
}