diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 9cc52e4932f..f1d04e97179 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -623,6 +623,13 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi // Create a closure to help with device allocation // Returns 'true' once no more devices need to be allocated. allocateRemainingFrom := func(devices sets.Set[string]) bool { + // When we call callGetPreferredAllocationIfAvailable below, we will release + // the lock and call the device plugin. If someone calls ListResource concurrently, + // device manager will recalculate the allocatedDevices map. Some entries with + // empty sets may be removed, so we reinit here. + if m.allocatedDevices[resource] == nil { + m.allocatedDevices[resource] = sets.New[string]() + } for device := range devices.Difference(allocated) { m.allocatedDevices[resource].Insert(device) allocated.Insert(device) @@ -634,11 +641,6 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi return false } - // Needs to allocate additional devices. - if m.allocatedDevices[resource] == nil { - m.allocatedDevices[resource] = sets.New[string]() - } - // Allocates from reusableDevices list first. if allocateRemainingFrom(reusableDevices) { return allocated, nil diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 0d0304832b5..14d2f0d3e8c 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -1184,6 +1184,66 @@ func TestPodContainerDeviceToAllocate(t *testing.T) { } +func TestDevicesToAllocateConflictWithUpdateAllocatedDevices(t *testing.T) { + podToAllocate := "podToAllocate" + containerToAllocate := "containerToAllocate" + podToRemove := "podToRemove" + containerToRemove := "containerToRemove" + deviceID := "deviceID" + resourceName := "domain1.com/resource" + + socket := filepath.Join(os.TempDir(), esocketName()) + devs := []*pluginapi.Device{ + {ID: deviceID, Health: pluginapi.Healthy}, + } + p, e := esetup(t, devs, socket, resourceName, func(n string, d []pluginapi.Device) {}) + + waitUpdateAllocatedDevicesChan := make(chan struct{}) + waitSetGetPreferredAllocChan := make(chan struct{}) + + p.SetGetPreferredAllocFunc(func(r *pluginapi.PreferredAllocationRequest, devs map[string]pluginapi.Device) (*pluginapi.PreferredAllocationResponse, error) { + waitSetGetPreferredAllocChan <- struct{}{} + <-waitUpdateAllocatedDevicesChan + return &pluginapi.PreferredAllocationResponse{ + ContainerResponses: []*pluginapi.ContainerPreferredAllocationResponse{ + { + DeviceIDs: []string{deviceID}, + }, + }, + }, nil + }) + + testManager := &ManagerImpl{ + endpoints: make(map[string]endpointInfo), + healthyDevices: make(map[string]sets.Set[string]), + unhealthyDevices: make(map[string]sets.Set[string]), + allocatedDevices: make(map[string]sets.Set[string]), + podDevices: newPodDevices(), + activePods: func() []*v1.Pod { return []*v1.Pod{} }, + sourcesReady: &sourcesReadyStub{}, + topologyAffinityStore: topologymanager.NewFakeManager(), + } + + testManager.endpoints[resourceName] = endpointInfo{ + e: e, + opts: &pluginapi.DevicePluginOptions{ + GetPreferredAllocationAvailable: true, + }, + } + testManager.healthyDevices[resourceName] = sets.New[string](deviceID) + testManager.podDevices.insert(podToRemove, containerToRemove, resourceName, nil, nil) + + go func() { + <-waitSetGetPreferredAllocChan + testManager.UpdateAllocatedDevices() + waitUpdateAllocatedDevicesChan <- struct{}{} + }() + + set, err := testManager.devicesToAllocate(podToAllocate, containerToAllocate, resourceName, 1, sets.New[string]()) + assert.NoError(t, err) + assert.Equal(t, set, sets.New[string](deviceID)) +} + func TestGetDeviceRunContainerOptions(t *testing.T) { res1 := TestResource{ resourceName: "domain1.com/resource1",