diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json index 69d6f2ade38..344ea527335 100644 --- a/api/openapi-spec/swagger.json +++ b/api/openapi-spec/swagger.json @@ -10679,6 +10679,7 @@ "io.k8s.api.core.v1.ResourceStatus": { "properties": { "name": { + "description": "Name of the resource. Must be unique within the pod and match one of the resources from the pod spec.", "type": "string" }, "resources": { diff --git a/api/openapi-spec/v3/api__v1_openapi.json b/api/openapi-spec/v3/api__v1_openapi.json index 85f68e90fa5..f33ed5aa951 100644 --- a/api/openapi-spec/v3/api__v1_openapi.json +++ b/api/openapi-spec/v3/api__v1_openapi.json @@ -6817,6 +6817,7 @@ "properties": { "name": { "default": "", + "description": "Name of the resource. Must be unique within the pod and match one of the resources from the pod spec.", "type": "string" }, "resources": { diff --git a/pkg/api/pod/util.go b/pkg/api/pod/util.go index 4c79e6381f3..9d3746f31df 100644 --- a/pkg/api/pod/util.go +++ b/pkg/api/pod/util.go @@ -1172,22 +1172,6 @@ func rroInUse(podSpec *api.PodSpec) bool { return inUse } -func allocatedResourcesStatusInUse(podSpec *api.PodStatus) bool { - if podSpec == nil { - return false - } - inUse := func(csl []api.ContainerStatus) bool { - for _, cs := range csl { - if len(cs.AllocatedResourcesStatus) > 0 { - return true - } - } - return false - } - - return inUse(podSpec.ContainerStatuses) || inUse(podSpec.InitContainerStatuses) || inUse(podSpec.EphemeralContainerStatuses) -} - func dropDisabledClusterTrustBundleProjection(podSpec, oldPodSpec *api.PodSpec) { if utilfeature.DefaultFeatureGate.Enabled(features.ClusterTrustBundleProjection) { return diff --git a/pkg/apis/core/validation/validation.go b/pkg/apis/core/validation/validation.go index abbcf0916d7..248d57d237c 100644 --- a/pkg/apis/core/validation/validation.go +++ b/pkg/apis/core/validation/validation.go @@ -8209,11 +8209,10 @@ func validateContainerStatusNoAllocatedResourcesStatus(containerStatuses []core. allErrors := field.ErrorList{} for i, containerStatus := range containerStatuses { - if containerStatus.AllocatedResourcesStatus == nil { + if len(containerStatus.AllocatedResourcesStatus) == 0 { continue - } else { - allErrors = append(allErrors, field.Forbidden(fldPath.Index(i).Child("allocatedResourcesStatus"), "cannot be set for a container status")) } + allErrors = append(allErrors, field.Forbidden(fldPath.Index(i).Child("allocatedResourcesStatus"), "must not be specified in container status")) } return allErrors @@ -8263,12 +8262,18 @@ func validateContainerStatusAllocatedResourcesStatus(containerStatuses []core.Co uniqueResources := sets.New[core.ResourceID]() // check resource IDs are unique for k, r := range allocatedResource.Resources { - if r.Health != core.ResourceHealthStatusHealthy && r.Health != core.ResourceHealthStatusUnhealthy && r.Health != core.ResourceHealthStatusUnknown { - allErrors = append(allErrors, field.Invalid(fldPath.Index(i).Child("allocatedResourcesStatus").Index(j).Child("resources").Index(k).Child("health"), r.Health, "must be one of Healthy, Unhealthy, Unknown")) + + var supportedResourceHealthValues = sets.New( + core.ResourceHealthStatusHealthy, + core.ResourceHealthStatusUnhealthy, + core.ResourceHealthStatusUnknown) + + if !supportedResourceHealthValues.Has(r.Health) { + allErrors = append(allErrors, field.NotSupported(fldPath.Index(i).Child("allocatedResourcesStatus").Index(j).Child("resources").Index(k).Child("health"), r.Health, sets.List(supportedResourceHealthValues))) } if uniqueResources.Has(r.ResourceID) { - allErrors = append(allErrors, field.Invalid(fldPath.Index(i).Child("allocatedResourcesStatus").Index(j).Child("resources").Index(k).Child("resourceID"), r.ResourceID, "must be unique")) + allErrors = append(allErrors, field.Duplicate(fldPath.Index(i).Child("allocatedResourcesStatus").Index(j).Child("resources").Index(k).Child("resourceID"), r.ResourceID)) } else { uniqueResources.Insert(r.ResourceID) } diff --git a/pkg/apis/core/validation/validation_test.go b/pkg/apis/core/validation/validation_test.go index 0573f0addc5..18bbdecd524 100644 --- a/pkg/apis/core/validation/validation_test.go +++ b/pkg/apis/core/validation/validation_test.go @@ -24457,11 +24457,11 @@ func TestValidateContainerStatusNoAllocatedResourcesStatus(t *testing.T) { errs := validateContainerStatusNoAllocatedResourcesStatus(containerStatuses, fldPath) - assert.Equal(t, 2, len(errs)) + assert.Len(t, errs, 2) assert.Equal(t, "spec.containers[1].allocatedResourcesStatus", errs[0].Field) - assert.Equal(t, "cannot be set for a container status", errs[0].Detail) + assert.Equal(t, "must not be specified in container status", errs[0].Detail) assert.Equal(t, "spec.containers[2].allocatedResourcesStatus", errs[1].Field) - assert.Equal(t, "cannot be set for a container status", errs[1].Detail) + assert.Equal(t, "must not be specified in container status", errs[1].Detail) } func TestValidateContainerStatusAllocatedResourcesStatus(t *testing.T) { @@ -24580,7 +24580,7 @@ func TestValidateContainerStatusAllocatedResourcesStatus(t *testing.T) { }, }, wantFieldErrors: field.ErrorList{ - field.Invalid(fldPath.Index(0).Child("allocatedResourcesStatus").Index(0).Child("resources").Index(1).Child("resourceID"), core.ResourceID("resource-1"), "must be unique"), + field.Duplicate(fldPath.Index(0).Child("allocatedResourcesStatus").Index(0).Child("resources").Index(1).Child("resourceID"), core.ResourceID("resource-1")), }, }, @@ -24619,6 +24619,38 @@ func TestValidateContainerStatusAllocatedResourcesStatus(t *testing.T) { field.Invalid(fldPath.Index(0).Child("allocatedResourcesStatus").Index(1).Child("name"), core.ResourceName("test.device/test2"), "must match one of the container's resource requirements"), }, }, + + "don't allow health status outside the known values": { + containers: []core.Container{ + { + Name: "container-1", + Resources: core.ResourceRequirements{ + Requests: core.ResourceList{ + "test.device/test": resource.MustParse("1"), + }, + }, + }, + }, + containerStatuses: []core.ContainerStatus{ + { + Name: "container-1", + AllocatedResourcesStatus: []core.ResourceStatus{ + { + Name: "test.device/test", + Resources: []core.ResourceHealth{ + { + ResourceID: "resource-1", + Health: "invalid-health-value", + }, + }, + }, + }, + }, + }, + wantFieldErrors: field.ErrorList{ + field.NotSupported(fldPath.Index(0).Child("allocatedResourcesStatus").Index(0).Child("resources").Index(0).Child("health"), core.ResourceHealthStatus("invalid-health-value"), []string{"Healthy", "Unhealthy", "Unknown"}), + }, + }, } for name, tt := range testCases { t.Run(name, func(t *testing.T) { diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 67df67eede0..ad5d514b0de 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -664,6 +664,13 @@ const ( // No effect for other cases such as using serverTLSbootstap. ReloadKubeletServerCertificateFile featuregate.Feature = "ReloadKubeletServerCertificateFile" + // owner: @SergeyKanzhelev + // kep: https://kep.k8s.io/4680 + // alpha: v1.31 + // + // Adds the AllocatedResourcesStatus to the container status. + ResourceHealthStatus featuregate.Feature = "ResourceHealthStatus" + // owner: @mikedanese // alpha: v1.7 // beta: v1.12 @@ -1150,6 +1157,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS ReloadKubeletServerCertificateFile: {Default: true, PreRelease: featuregate.Beta}, + ResourceHealthStatus: {Default: false, PreRelease: featuregate.Alpha}, + RotateKubeletServerCertificate: {Default: true, PreRelease: featuregate.Beta}, RuntimeClassInImageCriAPI: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index 33fb3ccb2f0..99fb69c8b98 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -32,6 +32,7 @@ import ( kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" "k8s.io/kubernetes/pkg/kubelet/apis/podresources" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" + "k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" @@ -132,6 +133,12 @@ type ContainerManager interface { // might need to unprepare resources. PodMightNeedToUnprepareResources(UID types.UID) bool + // UpdateAllocatedResourcesStatus updates the status of allocated resources for the pod. + UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStatus) + + // Updates returns a channel that receives an Update when the device changed its status. + Updates() <-chan resourceupdates.Update + // Implements the PodResources Provider API podresources.CPUsProvider podresources.DevicesProvider diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index c5ec87abfbd..29be9d202b8 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -53,6 +53,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/dra" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" memorymanagerstate "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state" + "k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util" "k8s.io/kubernetes/pkg/kubelet/config" @@ -1027,3 +1028,16 @@ func (cm *containerManagerImpl) UnprepareDynamicResources(pod *v1.Pod) error { func (cm *containerManagerImpl) PodMightNeedToUnprepareResources(UID types.UID) bool { return cm.draManager.PodMightNeedToUnprepareResources(UID) } + +func (cm *containerManagerImpl) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStatus) { + + // For now we only support Device Plugin + cm.deviceManager.UpdateAllocatedResourcesStatus(pod, status) + + // TODO(SergeyKanzhelev, https://kep.k8s.io/4680): add support for DRA resources which is planned for the next iteration of a KEP. +} + +func (cm *containerManagerImpl) Updates() <-chan resourceupdates.Update { + // TODO(SergeyKanzhelev, https://kep.k8s.io/4680): add support for DRA resources, for now only use device plugin updates. DRA support is planned for the next iteration of a KEP. + return cm.deviceManager.Updates() +} diff --git a/pkg/kubelet/cm/container_manager_stub.go b/pkg/kubelet/cm/container_manager_stub.go index cc9ab40ca97..6f849b67e52 100644 --- a/pkg/kubelet/cm/container_manager_stub.go +++ b/pkg/kubelet/cm/container_manager_stub.go @@ -28,6 +28,7 @@ import ( podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" + "k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -181,6 +182,13 @@ func (cm *containerManagerStub) PodMightNeedToUnprepareResources(UID types.UID) return false } +func (cm *containerManagerStub) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStatus) { +} + +func (cm *containerManagerStub) Updates() <-chan resourceupdates.Update { + return nil +} + func NewStubContainerManager() ContainerManager { return &containerManagerStub{shouldResetExtendedResourceCapacity: false} } diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index f62944aafcd..79a308d6047 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -41,6 +41,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" + "k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -205,6 +206,19 @@ func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Containe return opts, nil } +func (cm *containerManagerImpl) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStatus) { + // For now we only support Device Plugin + + cm.deviceManager.UpdateAllocatedResourcesStatus(pod, status) + + // TODO(SergeyKanzhelev, https://kep.k8s.io/4680): add support for DRA resources when DRA supports Windows +} + +func (cm *containerManagerImpl) Updates() <-chan resourceupdates.Update { + // TODO(SergeyKanzhelev, https://kep.k8s.io/4680): add support for DRA resources, for now only use device plugin updates + return cm.deviceManager.Updates() +} + func (cm *containerManagerImpl) UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { return cm.deviceManager.UpdatePluginResources(node, attrs) } diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 96943979da0..6264cb12dd9 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -33,12 +33,15 @@ import ( "k8s.io/apimachinery/pkg/api/resource" errorsutil "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1" + "k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/lifecycle" @@ -108,6 +111,9 @@ type ManagerImpl struct { // was reported running by the container runtime when `containerMap` was computed. // Used to detect pods running across a restart containerRunningSet sets.Set[string] + + // update channel for device health updates + update chan resourceupdates.Update } type endpointInfo struct { @@ -151,6 +157,7 @@ func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffi numaNodes: numaNodes, topologyAffinityStore: topologyAffinityStore, devicesToReuse: make(PodReusableDevices), + update: make(chan resourceupdates.Update), } server, err := plugin.NewServer(socketPath, manager, manager) @@ -174,6 +181,10 @@ func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffi return manager, nil } +func (m *ManagerImpl) Updates() <-chan resourceupdates.Update { + return m.update +} + // CleanupPluginDirectory is to remove all existing unix sockets // from /var/lib/kubelet/device-plugins on Device Plugin Manager start func (m *ManagerImpl) CleanupPluginDirectory(dir string) error { @@ -259,8 +270,26 @@ func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices [ m.mutex.Lock() m.healthyDevices[resourceName] = sets.New[string]() m.unhealthyDevices[resourceName] = sets.New[string]() + oldDevices := m.allDevices[resourceName] + podsToUpdate := sets.New[string]() m.allDevices[resourceName] = make(map[string]pluginapi.Device) for _, dev := range devices { + + if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) { + // compare with old device's health and send update to the channel if needed + if oldDev, ok := oldDevices[dev.ID]; ok { + if oldDev.Health != dev.Health { + podUID, _ := m.podDevices.getPodAndContainerForDevice(dev.ID) + podsToUpdate.Insert(podUID) + } + } else { + // if this is a new device, it might have existed before and disappeared for a while + // but still be assigned to a Pod. In this case, we need to send an update to the channel + podUID, _ := m.podDevices.getPodAndContainerForDevice(dev.ID) + podsToUpdate.Insert(podUID) + } + } + m.allDevices[resourceName][dev.ID] = dev if dev.Health == pluginapi.Healthy { m.healthyDevices[resourceName].Insert(dev.ID) @@ -270,6 +299,15 @@ func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices [ } } m.mutex.Unlock() + + if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) { + if len(podsToUpdate) > 0 { + m.update <- resourceupdates.Update{ + PodUIDs: podsToUpdate.UnsortedList(), + } + } + } + if err := m.writeCheckpoint(); err != nil { klog.ErrorS(err, "Writing checkpoint encountered") } @@ -1048,6 +1086,70 @@ func (m *ManagerImpl) GetDevices(podUID, containerName string) ResourceDeviceIns return m.podDevices.getContainerDevices(podUID, containerName) } +func (m *ManagerImpl) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStatus) { + m.mutex.Lock() + defer m.mutex.Unlock() + + // Today we ignore edge cases that are not likely to happen: + // - update statuses for containers that are in spec, but not in status + // - update statuses for resources requested in spec, but with no information in podDevices + for i, containerStatus := range status.ContainerStatuses { + devices := m.podDevices.getContainerDevices(string(pod.UID), containerStatus.Name) + + for resourceName, deviceInstances := range devices { + for id, d := range deviceInstances { + health := pluginapi.Healthy + // this is unlikely, but check for existence here anyways + if r, ok := m.allDevices[resourceName]; ok { + if _, ok := r[id]; ok { + health = m.allDevices[resourceName][id].Health + } + } + + d.Health = health + + deviceInstances[id] = d + } + } + + for resourceName, dI := range devices { + resourceStatus := v1.ResourceStatus{ + Name: v1.ResourceName(resourceName), + Resources: []v1.ResourceHealth{}, + } + + for id, d := range dI { + health := v1.ResourceHealthStatusHealthy + if d.Health != pluginapi.Healthy { + health = v1.ResourceHealthStatusUnhealthy + } + resourceStatus.Resources = append(resourceStatus.Resources, v1.ResourceHealth{ + ResourceID: v1.ResourceID(id), + Health: health, + }) + } + + if status.ContainerStatuses[i].AllocatedResourcesStatus == nil { + status.ContainerStatuses[i].AllocatedResourcesStatus = []v1.ResourceStatus{} + } + + // look up the resource status by name and update it + found := false + for j, rs := range status.ContainerStatuses[i].AllocatedResourcesStatus { + if rs.Name == resourceStatus.Name { + status.ContainerStatuses[i].AllocatedResourcesStatus[j] = resourceStatus + found = true + break + } + } + + if !found { + status.ContainerStatuses[i].AllocatedResourcesStatus = append(status.ContainerStatuses[i].AllocatedResourcesStatus, resourceStatus) + } + } + } +} + // ShouldResetExtendedResourceCapacity returns whether the extended resources should be zeroed or not, // depending on whether the node has been recreated. Absence of the checkpoint file strongly indicates the node // has been recreated. diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 190c3b833fe..7a328a8bc72 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -1850,3 +1850,84 @@ func TestGetTopologyHintsWithUpdates(t *testing.T) { }) } } +func TestUpdateAllocatedResourcesStatus(t *testing.T) { + podUID := "test-pod-uid" + containerName := "test-container" + resourceName := "test-resource" + + tmpDir, err := os.MkdirTemp("", "checkpoint") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + + defer func() { + err = os.RemoveAll(tmpDir) + if err != nil { + t.Fatalf("Fail to remove tmpdir: %v", err) + } + }() + ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) + if err != nil { + t.Fatalf("failed to create checkpoint manager: %v", err) + } + + testManager := &ManagerImpl{ + endpoints: make(map[string]endpointInfo), + healthyDevices: make(map[string]sets.Set[string]), + unhealthyDevices: make(map[string]sets.Set[string]), + allocatedDevices: make(map[string]sets.Set[string]), + allDevices: make(map[string]DeviceInstances), + podDevices: newPodDevices(), + checkpointManager: ckm, + } + + testManager.podDevices.insert(podUID, containerName, resourceName, + constructDevices([]string{"dev1", "dev2"}), + newContainerAllocateResponse( + withDevices(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"}), + withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}), + ), + ) + + testManager.genericDeviceUpdateCallback(resourceName, []pluginapi.Device{ + {ID: "dev1", Health: pluginapi.Healthy}, + {ID: "dev2", Health: pluginapi.Unhealthy}, + }) + + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(podUID), + }, + } + status := &v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + Name: containerName, + }, + }, + } + testManager.UpdateAllocatedResourcesStatus(pod, status) + + expectedStatus := v1.ResourceStatus{ + Name: v1.ResourceName(resourceName), + Resources: []v1.ResourceHealth{ + { + ResourceID: "dev1", + Health: pluginapi.Healthy, + }, + { + ResourceID: "dev2", + Health: pluginapi.Unhealthy, + }, + }, + } + expectedContainerStatuses := []v1.ContainerStatus{ + { + Name: containerName, + AllocatedResourcesStatus: []v1.ResourceStatus{expectedStatus}, + }, + } + if !reflect.DeepEqual(status.ContainerStatuses, expectedContainerStatuses) { + t.Errorf("UpdateAllocatedResourcesStatus failed, expected: %v, got: %v", expectedContainerStatuses, status.ContainerStatuses) + } +} diff --git a/pkg/kubelet/cm/devicemanager/pod_devices.go b/pkg/kubelet/cm/devicemanager/pod_devices.go index 99e7e89f482..32f47236fb4 100644 --- a/pkg/kubelet/cm/devicemanager/pod_devices.go +++ b/pkg/kubelet/cm/devicemanager/pod_devices.go @@ -183,6 +183,22 @@ func (pdev *podDevices) devices() map[string]sets.Set[string] { return ret } +// Returns podUID and containerName for a device +func (pdev *podDevices) getPodAndContainerForDevice(deviceID string) (string, string) { + pdev.RLock() + defer pdev.RUnlock() + for podUID, containerDevices := range pdev.devs { + for containerName, resources := range containerDevices { + for _, devices := range resources { + if devices.deviceIds.Devices().Has(deviceID) { + return podUID, containerName + } + } + } + } + return "", "" +} + // Turns podDevices to checkpointData. func (pdev *podDevices) toCheckpointData() []checkpoint.PodDevicesEntry { var data []checkpoint.PodDevicesEntry diff --git a/pkg/kubelet/cm/devicemanager/types.go b/pkg/kubelet/cm/devicemanager/types.go index 7e3261c667d..561b7e4e7ff 100644 --- a/pkg/kubelet/cm/devicemanager/types.go +++ b/pkg/kubelet/cm/devicemanager/types.go @@ -22,6 +22,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/kubelet/cm/containermap" + "k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -63,6 +64,9 @@ type Manager interface { // GetDevices returns information about the devices assigned to pods and containers GetDevices(podUID, containerName string) ResourceDeviceInstances + // UpdateAllocatedResourcesStatus updates the status of allocated resources for the pod. + UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStatus) + // GetAllocatableDevices returns information about all the devices known to the manager GetAllocatableDevices() ResourceDeviceInstances @@ -81,6 +85,9 @@ type Manager interface { // UpdateAllocatedDevices frees any Devices that are bound to terminated pods. UpdateAllocatedDevices() + + // Updates returns a channel that receives an Update when the device changed its status. + Updates() <-chan resourceupdates.Update } // DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices. diff --git a/pkg/kubelet/cm/fake_container_manager.go b/pkg/kubelet/cm/fake_container_manager.go index 8cf4d9b7c48..e1c687c935e 100644 --- a/pkg/kubelet/cm/fake_container_manager.go +++ b/pkg/kubelet/cm/fake_container_manager.go @@ -27,6 +27,7 @@ import ( podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" + "k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -253,3 +254,8 @@ func (cm *FakeContainerManager) UnprepareDynamicResources(*v1.Pod) error { func (cm *FakeContainerManager) PodMightNeedToUnprepareResources(UID types.UID) bool { return false } +func (cm *FakeContainerManager) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStatus) { +} +func (cm *FakeContainerManager) Updates() <-chan resourceupdates.Update { + return nil +} diff --git a/pkg/kubelet/cm/resourceupdates/updates.go b/pkg/kubelet/cm/resourceupdates/updates.go new file mode 100644 index 00000000000..3bf0155a3e5 --- /dev/null +++ b/pkg/kubelet/cm/resourceupdates/updates.go @@ -0,0 +1,25 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourceupdates + +// Update is a struct that represents an update to a pod when +// the resource changes it's status. +// Later we may need to add fields like container name, resource name, and a new status. +type Update struct { + // PodUID is the UID of the pod which status needs to be updated. + PodUIDs []string +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index d4980a8dc52..f4317a36da7 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2473,6 +2473,23 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety status = "started" } handleProbeSync(kl, update, handler, "startup", status) + case update := <-kl.containerManager.Updates(): + pods := []*v1.Pod{} + for _, p := range update.PodUIDs { + if pod, ok := kl.podManager.GetPodByUID(types.UID(p)); ok { + klog.V(3).InfoS("SyncLoop (containermanager): event for pod", "pod", klog.KObj(pod), "event", update) + pods = append(pods, pod) + } else { + // If the pod no longer exists, ignore the event. + klog.V(4).InfoS("SyncLoop (containermanager): pod does not exist, ignore devices updates", "event", update) + } + } + if len(pods) > 0 { + // Updating the pod by syncing it again + // We do not apply the optimization by updating the status directly, but can do it later + handler.HandlePodSyncs(pods) + } + case <-housekeepingCh: if !kl.sourcesReady.AllReady() { // If the sources aren't ready or volume manager has not yet synced the states, diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 8aaf01381aa..7d24a90b0e7 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1834,6 +1834,11 @@ func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.Po // ensure the probe managers have up to date status for containers kl.probeManager.UpdatePodStatus(pod, s) + // update the allocated resources status + if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) { + kl.containerManager.UpdateAllocatedResourcesStatus(pod, s) + } + // preserve all conditions not owned by the kubelet s.Conditions = make([]v1.PodCondition, 0, len(pod.Status.Conditions)+1) for _, c := range pod.Status.Conditions { diff --git a/test/e2e_node/device_plugin_failures_pod_status_test.go b/test/e2e_node/device_plugin_failures_pod_status_test.go new file mode 100644 index 00000000000..5c1275bfaa5 --- /dev/null +++ b/test/e2e_node/device_plugin_failures_pod_status_test.go @@ -0,0 +1,249 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2enode + +import ( + "context" + "fmt" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + + v1 "k8s.io/api/core/v1" + kubeletdevicepluginv1beta1 "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + imageutils "k8s.io/kubernetes/test/utils/image" + admissionapi "k8s.io/pod-security-admission/api" + + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/test/e2e/framework" + "k8s.io/kubernetes/test/e2e_node/testdeviceplugin" +) + +var _ = SIGDescribe("Device Plugin Failures Pod Status:", framework.WithFeatureGate(features.ResourceHealthStatus), func() { + f := framework.NewDefaultFramework("device-plugin-failures") + f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged + + type ResourceValue struct { + Allocatable int + Capacity int + } + + var getNodeResourceValues = func(ctx context.Context, resourceName string) ResourceValue { + ginkgo.GinkgoHelper() + node := getLocalNode(ctx, f) + + // -1 represents that the resource is not found + result := ResourceValue{ + Allocatable: -1, + Capacity: -1, + } + + for key, val := range node.Status.Capacity { + resource := string(key) + if resource == resourceName { + result.Capacity = int(val.Value()) + break + } + } + + for key, val := range node.Status.Allocatable { + resource := string(key) + if resource == resourceName { + result.Allocatable = int(val.Value()) + break + } + } + + return result + } + + var createPod = func(resourceName string, quantity int) *v1.Pod { + ginkgo.GinkgoHelper() + rl := v1.ResourceList{v1.ResourceName(resourceName): *resource.NewQuantity(int64(quantity), resource.DecimalSI)} + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "device-plugin-failures-test-" + string(uuid.NewUUID())}, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyAlways, + Containers: []v1.Container{{ + Image: busyboxImage, + Name: "container-1", + Command: []string{"sh", "-c", fmt.Sprintf("env && sleep %s", sleepIntervalForever)}, + Resources: v1.ResourceRequirements{ + Limits: rl, + Requests: rl, + }, + }}, + }, + } + return pod + } + + var createPodWrongImage = func(resourceName string, quantity int) *v1.Pod { + ginkgo.GinkgoHelper() + rl := v1.ResourceList{v1.ResourceName(resourceName): *resource.NewQuantity(int64(quantity), resource.DecimalSI)} + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "device-plugin-failures-test-" + string(uuid.NewUUID())}, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyAlways, + Containers: []v1.Container{{ + Image: imageutils.GetE2EImage(imageutils.InvalidRegistryImage), + ImagePullPolicy: v1.PullAlways, // this is to make test not fail on non pre-pulled image validation + Name: "container-1", + Command: []string{"sh", "-c", fmt.Sprintf("env && sleep %s", sleepIntervalForever)}, + Resources: v1.ResourceRequirements{ + Limits: rl, + Requests: rl, + }, + }}, + }, + } + return pod + } + + nodeStatusUpdateTimeout := 1 * time.Minute + devicePluginUpdateTimeout := 1 * time.Minute + + ginkgo.It("will report a Healthy and then Unhealthy single device in the pod status", func(ctx context.Context) { + // randomizing so tests can run in parallel + resourceName := fmt.Sprintf("test.device/%s", f.UniqueName) + devices := []kubeletdevicepluginv1beta1.Device{{ID: "testdevice", Health: kubeletdevicepluginv1beta1.Healthy}} + plugin := testdeviceplugin.NewDevicePlugin(nil) + + err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices) + defer plugin.Stop() // should stop even if registration failed + gomega.Expect(err).To(gomega.Succeed()) + + ginkgo.By("initial state: capacity and allocatable are set") + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 1, Capacity: 1})) + + // schedule a pod that requests the device + client := e2epod.NewPodClient(f) + pod := client.Create(ctx, createPod(resourceName, 1)) + + // wait for the pod to be running + gomega.Expect(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod)).To(gomega.Succeed()) + + expectedStatus := []v1.ResourceStatus{ + { + Name: v1.ResourceName(resourceName), + Resources: []v1.ResourceHealth{ + { + ResourceID: "testdevice", + Health: v1.ResourceHealthStatusHealthy, + }, + }, + }, + } + + gomega.Eventually(func() []v1.ResourceStatus { + pod, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(ctx, pod.Name, metav1.GetOptions{}) + return pod.Status.ContainerStatuses[0].AllocatedResourcesStatus + }, devicePluginUpdateTimeout, f.Timeouts.Poll).Should(gomega.Equal(expectedStatus)) + + // now make the device unhealthy + devices[0].Health = kubeletdevicepluginv1beta1.Unhealthy + plugin.UpdateDevices(devices) + + expectedStatus[0].Resources[0] = v1.ResourceHealth{ + ResourceID: "testdevice", + Health: v1.ResourceHealthStatusUnhealthy, + } + + gomega.Eventually(func() []v1.ResourceStatus { + pod, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(ctx, pod.Name, metav1.GetOptions{}) + return pod.Status.ContainerStatuses[0].AllocatedResourcesStatus + }, devicePluginUpdateTimeout, f.Timeouts.Poll).Should(gomega.Equal(expectedStatus)) + + // deleting the pod + err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + gomega.Expect(err).To(gomega.Succeed()) + + waitForContainerRemoval(ctx, pod.Spec.Containers[0].Name, pod.Name, pod.Namespace) + }) + + ginkgo.It("will report a Device Status for the failed pod in the pod status", func(ctx context.Context) { + // randomizing so tests can run in parallel + resourceName := fmt.Sprintf("test.device/%s", f.UniqueName) + devices := []kubeletdevicepluginv1beta1.Device{{ID: "testdevice", Health: kubeletdevicepluginv1beta1.Healthy}} + plugin := testdeviceplugin.NewDevicePlugin(nil) + + err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices) + defer plugin.Stop() // should stop even if registration failed + gomega.Expect(err).To(gomega.Succeed()) + + ginkgo.By("initial state: capacity and allocatable are set") + gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 1, Capacity: 1})) + + // schedule a pod that requests the device + client := e2epod.NewPodClient(f) + pod := client.Create(ctx, createPodWrongImage(resourceName, 1)) + + // wait for the pod to be running + gomega.Expect(e2epod.WaitForPodCondition(ctx, f.ClientSet, pod.Namespace, pod.Name, "Back-off pulling image", f.Timeouts.PodStartShort, + func(pod *v1.Pod) (bool, error) { + if pod.Status.Phase == v1.PodPending && + len(pod.Status.ContainerStatuses) > 0 && + pod.Status.ContainerStatuses[0].State.Waiting != nil && + pod.Status.ContainerStatuses[0].State.Waiting.Reason == "ImagePullBackOff" { + return true, nil + } + return false, nil + })).To(gomega.Succeed()) + + expectedStatus := []v1.ResourceStatus{ + { + Name: v1.ResourceName(resourceName), + Resources: []v1.ResourceHealth{ + { + ResourceID: "testdevice", + Health: v1.ResourceHealthStatusHealthy, + }, + }, + }, + } + + gomega.Eventually(func() []v1.ResourceStatus { + pod, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(ctx, pod.Name, metav1.GetOptions{}) + return pod.Status.ContainerStatuses[0].AllocatedResourcesStatus + }, devicePluginUpdateTimeout, f.Timeouts.Poll).Should(gomega.Equal(expectedStatus)) + + // now make the device unhealthy + devices[0].Health = kubeletdevicepluginv1beta1.Unhealthy + plugin.UpdateDevices(devices) + + expectedStatus[0].Resources[0] = v1.ResourceHealth{ + ResourceID: "testdevice", + Health: "Unhealthy", + } + + gomega.Eventually(func() []v1.ResourceStatus { + pod, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(ctx, pod.Name, metav1.GetOptions{}) + return pod.Status.ContainerStatuses[0].AllocatedResourcesStatus + }, devicePluginUpdateTimeout, f.Timeouts.Poll).Should(gomega.Equal(expectedStatus)) + + // deleting the pod + err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + gomega.Expect(err).To(gomega.Succeed()) + + waitForContainerRemoval(ctx, pod.Spec.Containers[0].Name, pod.Name, pod.Namespace) + }) +}) diff --git a/test/e2e_node/device_plugin_failures_test.go b/test/e2e_node/device_plugin_failures_test.go index 8c128653191..27f7b12232e 100644 --- a/test/e2e_node/device_plugin_failures_test.go +++ b/test/e2e_node/device_plugin_failures_test.go @@ -36,16 +36,15 @@ import ( "k8s.io/kubernetes/test/e2e_node/testdeviceplugin" ) -type ResourceValue struct { - Allocatable int - Capacity int -} - -// Serial because the test restarts Kubelet var _ = SIGDescribe("Device Plugin Failures:", framework.WithNodeConformance(), func() { f := framework.NewDefaultFramework("device-plugin-failures") f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged + type ResourceValue struct { + Allocatable int + Capacity int + } + var getNodeResourceValues = func(ctx context.Context, resourceName string) ResourceValue { ginkgo.GinkgoHelper() node := getLocalNode(ctx, f)