diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 52550860574..32a618e7221 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -18,6 +18,7 @@ package devicemanager import ( "context" + goerrors "errors" "fmt" "os" "path/filepath" @@ -157,7 +158,7 @@ func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffi numaNodes: numaNodes, topologyAffinityStore: topologyAffinityStore, devicesToReuse: make(PodReusableDevices), - update: make(chan resourceupdates.Update), + update: make(chan resourceupdates.Update, 100), } server, err := plugin.NewServer(socketPath, manager, manager) @@ -309,8 +310,10 @@ func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices [ if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) { if len(podsToUpdate) > 0 { - m.update <- resourceupdates.Update{ - PodUIDs: podsToUpdate.UnsortedList(), + select { + case m.update <- resourceupdates.Update{PodUIDs: podsToUpdate.UnsortedList()}: + default: + klog.ErrorS(goerrors.New("device update channel is full"), "discard pods info", "podsToUpdate", podsToUpdate.UnsortedList()) } } } diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index c7957e233ba..5c00f84bd1c 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -1970,18 +1970,17 @@ func TestFeatureGateResourceHealthStatus(t *testing.T) { ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) require.NoError(t, err, "err should be nil") resourceName := "domain1.com/resource1" - existDevices := map[string]DeviceInstances{ - resourceName: map[string]pluginapi.Device{ - "dev1": { - ID: "dev1", - Health: pluginapi.Healthy, - }, - "dev2": { - ID: "dev2", - Health: pluginapi.Unhealthy, - }, - }, + existDevices := map[string]DeviceInstances{} + resourceNameMap := make(map[string]pluginapi.Device) + deviceUpdateNumber, deviceUpdateChanBuffer := 200, 100 + for i := 0; i < deviceUpdateNumber; i++ { + resourceNameMap[fmt.Sprintf("dev%d", i)] = pluginapi.Device{ + ID: fmt.Sprintf("dev%d", i), + Health: pluginapi.Healthy, + } } + existDevices[resourceName] = resourceNameMap + testManager := &ManagerImpl{ allDevices: ResourceDeviceInstances(existDevices), endpoints: make(map[string]endpointInfo), @@ -1990,43 +1989,42 @@ func TestFeatureGateResourceHealthStatus(t *testing.T) { allocatedDevices: make(map[string]sets.Set[string]), podDevices: newPodDevices(), checkpointManager: ckm, - update: make(chan resourceupdates.Update), + update: make(chan resourceupdates.Update, deviceUpdateChanBuffer), } - podID := "pod1" - contID := "con1" - devices := checkpoint.DevicesPerNUMA{0: []string{"dev1"}, 1: []string{"dev1"}} + for i := 0; i < deviceUpdateNumber; i++ { + podID := fmt.Sprintf("pod%d", i) + contID := fmt.Sprintf("con%d", i) + devices := checkpoint.DevicesPerNUMA{0: []string{fmt.Sprintf("dev%d", i)}} + testManager.podDevices.insert(podID, contID, resourceName, + devices, + newContainerAllocateResponse( + withDevices(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"}), + withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}), + ), + ) + } - testManager.podDevices.insert(podID, contID, resourceName, - devices, - newContainerAllocateResponse( - withDevices(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"}), - withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}), - ), - ) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ResourceHealthStatus, true) - testManager.genericDeviceUpdateCallback(resourceName, []pluginapi.Device{ - {ID: "dev1", Health: pluginapi.Healthy}, - {ID: "dev2", Health: pluginapi.Unhealthy}, - }) + for i := 0; i < deviceUpdateNumber; i++ { + testManager.genericDeviceUpdateCallback(resourceName, []pluginapi.Device{ + {ID: "dev1", Health: pluginapi.Healthy}, + }) + } // update chan no data assert.Empty(t, testManager.update) - // update chan receive pod1 - var wg sync.WaitGroup - go func() { - defer wg.Done() + // update device status, assume all device unhealthy. + for i := 0; i < deviceUpdateNumber; i++ { + testManager.genericDeviceUpdateCallback(resourceName, []pluginapi.Device{ + {ID: fmt.Sprintf("dev%d", i), Health: pluginapi.Unhealthy}, + }) + } + for i := 0; i < deviceUpdateChanBuffer; i++ { u := <-testManager.update assert.Equal(t, resourceupdates.Update{ - PodUIDs: []string{"pod1"}, + PodUIDs: []string{fmt.Sprintf("pod%d", i)}, }, u) - }() - wg.Add(1) - testManager.genericDeviceUpdateCallback(resourceName, []pluginapi.Device{ - {ID: "dev1", Health: pluginapi.Unhealthy}, - {ID: "dev2", Health: pluginapi.Healthy}, - }) - wg.Wait() - + } }