diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index bca6b516be7..52550860574 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -280,16 +280,20 @@ func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices [ if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) { // compare with old device's health and send update to the channel if needed + updatePodUIDFn := func(deviceID string) { + podUID, _ := m.podDevices.getPodAndContainerForDevice(deviceID) + if podUID != "" { + podsToUpdate.Insert(podUID) + } + } if oldDev, ok := oldDevices[dev.ID]; ok { if oldDev.Health != dev.Health { - podUID, _ := m.podDevices.getPodAndContainerForDevice(dev.ID) - podsToUpdate.Insert(podUID) + updatePodUIDFn(dev.ID) } } else { // if this is a new device, it might have existed before and disappeared for a while // but still be assigned to a Pod. In this case, we need to send an update to the channel - podUID, _ := m.podDevices.getPodAndContainerForDevice(dev.ID) - podsToUpdate.Insert(podUID) + updatePodUIDFn(dev.ID) } } diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 4599a8f596e..01ffbf854bf 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -38,13 +38,17 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" + featuregatetesting "k8s.io/component-base/featuregate/testing" pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" watcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1" + "k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" "k8s.io/kubernetes/pkg/kubelet/config" @@ -1953,3 +1957,76 @@ func sortContainerStatuses(statuses []v1.ContainerStatus) { } } } + +func TestFeatureGateResourceHealthStatus(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "checkpoint") + require.NoError(t, err, "err should be nil") + defer func() { + err = os.RemoveAll(tmpDir) + if err != nil { + t.Fatalf("Fail to remove tmpdir: %v", err) + } + }() + ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) + require.NoError(t, err, "err should be nil") + resourceName := "domain1.com/resource1" + existDevices := map[string]DeviceInstances{ + resourceName: map[string]pluginapi.Device{ + "dev1": { + ID: "dev1", + Health: pluginapi.Healthy, + }, + "dev2": { + ID: "dev2", + Health: pluginapi.Unhealthy, + }, + }, + } + testManager := &ManagerImpl{ + allDevices: ResourceDeviceInstances(existDevices), + endpoints: make(map[string]endpointInfo), + healthyDevices: make(map[string]sets.Set[string]), + unhealthyDevices: make(map[string]sets.Set[string]), + allocatedDevices: make(map[string]sets.Set[string]), + podDevices: newPodDevices(), + checkpointManager: ckm, + update: make(chan resourceupdates.Update), + } + + podID := "pod1" + contID := "con1" + devices := checkpoint.DevicesPerNUMA{0: []string{"dev1"}, 1: []string{"dev1"}} + + testManager.podDevices.insert(podID, contID, resourceName, + devices, + newContainerAllocateResponse( + withDevices(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"}), + withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}), + ), + ) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ResourceHealthStatus, true) + + testManager.genericDeviceUpdateCallback(resourceName, []pluginapi.Device{ + {ID: "dev1", Health: pluginapi.Healthy}, + {ID: "dev2", Health: pluginapi.Unhealthy}, + }) + // update chan no data + assert.Empty(t, len(testManager.update), 0) + + // update chan receive pod1 + var wg sync.WaitGroup + go func() { + defer wg.Done() + u := <-testManager.update + assert.Equal(t, resourceupdates.Update{ + PodUIDs: []string{"pod1"}, + }, u) + }() + wg.Add(1) + testManager.genericDeviceUpdateCallback(resourceName, []pluginapi.Device{ + {ID: "dev1", Health: pluginapi.Unhealthy}, + {ID: "dev2", Health: pluginapi.Healthy}, + }) + wg.Wait() + +} diff --git a/pkg/kubelet/cm/devicemanager/pod_devices_test.go b/pkg/kubelet/cm/devicemanager/pod_devices_test.go index 28c2fa72024..66fee2737cc 100644 --- a/pkg/kubelet/cm/devicemanager/pod_devices_test.go +++ b/pkg/kubelet/cm/devicemanager/pod_devices_test.go @@ -251,3 +251,27 @@ func TestDeviceRunContainerOptions(t *testing.T) { }) } } + +func TestGetPodAndContainerForDevice(t *testing.T) { + podDevices := newPodDevices() + resourceName1 := "domain1.com/resource1" + podID := "pod1" + contID := "con1" + devices := checkpoint.DevicesPerNUMA{0: []string{"dev1"}, 1: []string{"dev1"}} + + podDevices.insert(podID, contID, resourceName1, + devices, + newContainerAllocateResponse( + withDevices(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"}), + withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}), + ), + ) + + // dev2 is a new device + podUID, _ := podDevices.getPodAndContainerForDevice("dev2") + assert.Equal(t, "", podUID) + + // dev1 is a exist device + podUID, _ = podDevices.getPodAndContainerForDevice("dev1") + assert.Equal(t, "pod1", podUID) +}