diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 5c3e4ddee0a..8cb57aa8190 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -646,7 +646,7 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi func (m *ManagerImpl) filterByAffinity(podUID, contName, resource string, available sets.String) (sets.String, sets.String, sets.String) { // If alignment information is not available, just pass the available list back. hint := m.topologyAffinityStore.GetAffinity(podUID, contName) - if !m.deviceHasTopologyAlignmentLocked(resource) || hint.NUMANodeAffinity == nil { + if !m.deviceHasTopologyAlignment(resource) || hint.NUMANodeAffinity == nil { return sets.NewString(), sets.NewString(), available } diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 08c5a39ada4..6b7830c2316 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -1423,6 +1423,9 @@ func TestGetTopologyHintsWithUpdates(t *testing.T) { }, }}) } + testPod := makePod(v1.ResourceList{ + testResourceName: *resource.NewQuantity(int64(1), resource.DecimalSI), + }) topology := []cadvisorapi.Node{ {Id: 0}, } @@ -1433,28 +1436,19 @@ func TestGetTopologyHintsWithUpdates(t *testing.T) { testfunc func(manager *wrappedManagerImpl) }{ { - description: "getAvailableDevices data race when update device", - count: 1, + description: "GetTopologyHints data race when update device", + count: 10, devices: devs, testfunc: func(manager *wrappedManagerImpl) { - manager.getAvailableDevices(testResourceName) + manager.GetTopologyHints(testPod, &testPod.Spec.Containers[0]) }, }, { - description: "generateDeviceTopologyHints data race when update device", - count: 1, + description: "GetPodTopologyHints data race when update device", + count: 10, devices: devs, testfunc: func(manager *wrappedManagerImpl) { - manager.generateDeviceTopologyHints( - testResourceName, sets.NewString(), sets.NewString(), 1) - }, - }, - { - description: "deviceHasTopologyAlignment data race when update device", - count: 1000, - devices: devs[:1], - testfunc: func(manager *wrappedManagerImpl) { - manager.deviceHasTopologyAlignment(testResourceName) + manager.GetPodTopologyHints(testPod) }, }, } diff --git a/pkg/kubelet/cm/devicemanager/topology_hints.go b/pkg/kubelet/cm/devicemanager/topology_hints.go index e32d97c4adb..9e03a9029b9 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints.go @@ -38,47 +38,44 @@ func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map // Loop through all device resources and generate TopologyHints for them.. deviceHints := make(map[string][]topologymanager.TopologyHint) - for resourceObj, requestedObj := range container.Resources.Limits { - resource := string(resourceObj) - requested := int(requestedObj.Value()) + accumulatedResourceRequests := m.getContainerDeviceRequest(container) + m.mutex.Lock() + defer m.mutex.Unlock() + for resource, requested := range accumulatedResourceRequests { + // Only consider devices that actually container topology information. + if aligned := m.deviceHasTopologyAlignment(resource); !aligned { + klog.InfoS("Resource does not have a topology preference", "resource", resource) + deviceHints[resource] = nil + continue + } - // Only consider resources associated with a device plugin. - if m.isDevicePluginResource(resource) { - // Only consider devices that actually container topology information. - if aligned := m.deviceHasTopologyAlignment(resource); !aligned { - klog.InfoS("Resource does not have a topology preference", "resource", resource) - deviceHints[resource] = nil - continue - } - - // Short circuit to regenerate the same hints if there are already - // devices allocated to the Container. This might happen after a - // kubelet restart, for example. - allocated := m.podDevices.containerDevices(string(pod.UID), container.Name, resource) - if allocated.Len() > 0 { - if allocated.Len() != requested { - klog.ErrorS(nil, "Resource already allocated to pod with different number than request", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name, "request", requested, "allocated", allocated.Len()) - deviceHints[resource] = []topologymanager.TopologyHint{} - continue - } - klog.InfoS("Regenerating TopologyHints for resource already allocated to pod", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name) - deviceHints[resource] = m.generateDeviceTopologyHints(resource, allocated, sets.String{}, requested) - continue - } - - // Get the list of available devices, for which TopologyHints should be generated. - available := m.getAvailableDevices(resource) - reusable := m.devicesToReuse[string(pod.UID)][resource] - if available.Union(reusable).Len() < requested { - klog.ErrorS(nil, "Unable to generate topology hints: requested number of devices unavailable", "resource", resource, "request", requested, "available", available.Union(reusable).Len()) + // Short circuit to regenerate the same hints if there are already + // devices allocated to the Container. This might happen after a + // kubelet restart, for example. + allocated := m.podDevices.containerDevices(string(pod.UID), container.Name, resource) + if allocated.Len() > 0 { + if allocated.Len() != requested { + klog.ErrorS(nil, "Resource already allocated to pod with different number than request", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name, "request", requested, "allocated", allocated.Len()) deviceHints[resource] = []topologymanager.TopologyHint{} continue } - - // Generate TopologyHints for this resource given the current - // request size and the list of available devices. - deviceHints[resource] = m.generateDeviceTopologyHints(resource, available, reusable, requested) + klog.InfoS("Regenerating TopologyHints for resource already allocated to pod", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name) + deviceHints[resource] = m.generateDeviceTopologyHints(resource, allocated, sets.String{}, requested) + continue } + + // Get the list of available devices, for which TopologyHints should be generated. + available := m.getAvailableDevices(resource) + reusable := m.devicesToReuse[string(pod.UID)][resource] + if available.Union(reusable).Len() < requested { + klog.ErrorS(nil, "Unable to generate topology hints: requested number of devices unavailable", "resource", resource, "request", requested, "available", available.Union(reusable).Len()) + deviceHints[resource] = []topologymanager.TopologyHint{} + continue + } + + // Generate TopologyHints for this resource given the current + // request size and the list of available devices. + deviceHints[resource] = m.generateDeviceTopologyHints(resource, available, reusable, requested) } return deviceHints @@ -96,7 +93,8 @@ func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymana deviceHints := make(map[string][]topologymanager.TopologyHint) accumulatedResourceRequests := m.getPodDeviceRequest(pod) - + m.mutex.Lock() + defer m.mutex.Unlock() for resource, requested := range accumulatedResourceRequests { // Only consider devices that actually contain topology information. if aligned := m.deviceHasTopologyAlignment(resource); !aligned { @@ -136,7 +134,7 @@ func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymana return deviceHints } -func (m *ManagerImpl) deviceHasTopologyAlignmentLocked(resource string) bool { +func (m *ManagerImpl) deviceHasTopologyAlignment(resource string) bool { // If any device has Topology NUMANodes available, we assume they care about alignment. for _, device := range m.allDevices[resource] { if device.Topology != nil && len(device.Topology.Nodes) > 0 { @@ -146,22 +144,12 @@ func (m *ManagerImpl) deviceHasTopologyAlignmentLocked(resource string) bool { return false } -func (m *ManagerImpl) deviceHasTopologyAlignment(resource string) bool { - m.mutex.Lock() - defer m.mutex.Unlock() - return m.deviceHasTopologyAlignmentLocked(resource) -} - func (m *ManagerImpl) getAvailableDevices(resource string) sets.String { - m.mutex.Lock() - defer m.mutex.Unlock() // Strip all devices in use from the list of healthy ones. return m.healthyDevices[resource].Difference(m.allocatedDevices[resource]) } func (m *ManagerImpl) generateDeviceTopologyHints(resource string, available sets.String, reusable sets.String, request int) []topologymanager.TopologyHint { - m.mutex.Lock() - defer m.mutex.Unlock() // Initialize minAffinitySize to include all NUMA Nodes minAffinitySize := len(m.numaNodes) @@ -307,3 +295,16 @@ func (m *ManagerImpl) getPodDeviceRequest(pod *v1.Pod) map[string]int { return podRequests } + +func (m *ManagerImpl) getContainerDeviceRequest(container *v1.Container) map[string]int { + containerRequests := make(map[string]int) + for resourceObj, requestedObj := range container.Resources.Limits { + resource := string(resourceObj) + requested := int(requestedObj.Value()) + if !m.isDevicePluginResource(resource) { + continue + } + containerRequests[resource] = requested + } + return containerRequests +} diff --git a/pkg/kubelet/cm/devicemanager/topology_hints_test.go b/pkg/kubelet/cm/devicemanager/topology_hints_test.go index 33becf9febf..a7bc5157366 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints_test.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints_test.go @@ -456,7 +456,7 @@ func TestTopologyAlignedAllocation(t *testing.T) { } alignment := make(map[int]int) - if m.deviceHasTopologyAlignmentLocked(tc.resource) { + if m.deviceHasTopologyAlignment(tc.resource) { for d := range allocated { if m.allDevices[tc.resource][d].Topology != nil { alignment[int(m.allDevices[tc.resource][d].Topology.Nodes[0].ID)]++