add lock in generate topology hints function

This commit is contained in:
huyinhou 2023-02-20 10:46:04 +08:00
parent 4702503d15
commit 32495ae3f1
4 changed files with 60 additions and 65 deletions

View File

@ -646,7 +646,7 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
func (m *ManagerImpl) filterByAffinity(podUID, contName, resource string, available sets.String) (sets.String, sets.String, sets.String) { func (m *ManagerImpl) filterByAffinity(podUID, contName, resource string, available sets.String) (sets.String, sets.String, sets.String) {
// If alignment information is not available, just pass the available list back. // If alignment information is not available, just pass the available list back.
hint := m.topologyAffinityStore.GetAffinity(podUID, contName) hint := m.topologyAffinityStore.GetAffinity(podUID, contName)
if !m.deviceHasTopologyAlignmentLocked(resource) || hint.NUMANodeAffinity == nil { if !m.deviceHasTopologyAlignment(resource) || hint.NUMANodeAffinity == nil {
return sets.NewString(), sets.NewString(), available return sets.NewString(), sets.NewString(), available
} }

View File

@ -1423,6 +1423,9 @@ func TestGetTopologyHintsWithUpdates(t *testing.T) {
}, },
}}) }})
} }
testPod := makePod(v1.ResourceList{
testResourceName: *resource.NewQuantity(int64(1), resource.DecimalSI),
})
topology := []cadvisorapi.Node{ topology := []cadvisorapi.Node{
{Id: 0}, {Id: 0},
} }
@ -1433,28 +1436,19 @@ func TestGetTopologyHintsWithUpdates(t *testing.T) {
testfunc func(manager *wrappedManagerImpl) testfunc func(manager *wrappedManagerImpl)
}{ }{
{ {
description: "getAvailableDevices data race when update device", description: "GetTopologyHints data race when update device",
count: 1, count: 10,
devices: devs, devices: devs,
testfunc: func(manager *wrappedManagerImpl) { testfunc: func(manager *wrappedManagerImpl) {
manager.getAvailableDevices(testResourceName) manager.GetTopologyHints(testPod, &testPod.Spec.Containers[0])
}, },
}, },
{ {
description: "generateDeviceTopologyHints data race when update device", description: "GetPodTopologyHints data race when update device",
count: 1, count: 10,
devices: devs, devices: devs,
testfunc: func(manager *wrappedManagerImpl) { testfunc: func(manager *wrappedManagerImpl) {
manager.generateDeviceTopologyHints( manager.GetPodTopologyHints(testPod)
testResourceName, sets.NewString(), sets.NewString(), 1)
},
},
{
description: "deviceHasTopologyAlignment data race when update device",
count: 1000,
devices: devs[:1],
testfunc: func(manager *wrappedManagerImpl) {
manager.deviceHasTopologyAlignment(testResourceName)
}, },
}, },
} }

View File

@ -38,47 +38,44 @@ func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map
// Loop through all device resources and generate TopologyHints for them.. // Loop through all device resources and generate TopologyHints for them..
deviceHints := make(map[string][]topologymanager.TopologyHint) deviceHints := make(map[string][]topologymanager.TopologyHint)
for resourceObj, requestedObj := range container.Resources.Limits { accumulatedResourceRequests := m.getContainerDeviceRequest(container)
resource := string(resourceObj) m.mutex.Lock()
requested := int(requestedObj.Value()) defer m.mutex.Unlock()
for resource, requested := range accumulatedResourceRequests {
// Only consider devices that actually container topology information.
if aligned := m.deviceHasTopologyAlignment(resource); !aligned {
klog.InfoS("Resource does not have a topology preference", "resource", resource)
deviceHints[resource] = nil
continue
}
// Only consider resources associated with a device plugin. // Short circuit to regenerate the same hints if there are already
if m.isDevicePluginResource(resource) { // devices allocated to the Container. This might happen after a
// Only consider devices that actually container topology information. // kubelet restart, for example.
if aligned := m.deviceHasTopologyAlignment(resource); !aligned { allocated := m.podDevices.containerDevices(string(pod.UID), container.Name, resource)
klog.InfoS("Resource does not have a topology preference", "resource", resource) if allocated.Len() > 0 {
deviceHints[resource] = nil if allocated.Len() != requested {
continue klog.ErrorS(nil, "Resource already allocated to pod with different number than request", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name, "request", requested, "allocated", allocated.Len())
}
// Short circuit to regenerate the same hints if there are already
// devices allocated to the Container. This might happen after a
// kubelet restart, for example.
allocated := m.podDevices.containerDevices(string(pod.UID), container.Name, resource)
if allocated.Len() > 0 {
if allocated.Len() != requested {
klog.ErrorS(nil, "Resource already allocated to pod with different number than request", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name, "request", requested, "allocated", allocated.Len())
deviceHints[resource] = []topologymanager.TopologyHint{}
continue
}
klog.InfoS("Regenerating TopologyHints for resource already allocated to pod", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name)
deviceHints[resource] = m.generateDeviceTopologyHints(resource, allocated, sets.String{}, requested)
continue
}
// Get the list of available devices, for which TopologyHints should be generated.
available := m.getAvailableDevices(resource)
reusable := m.devicesToReuse[string(pod.UID)][resource]
if available.Union(reusable).Len() < requested {
klog.ErrorS(nil, "Unable to generate topology hints: requested number of devices unavailable", "resource", resource, "request", requested, "available", available.Union(reusable).Len())
deviceHints[resource] = []topologymanager.TopologyHint{} deviceHints[resource] = []topologymanager.TopologyHint{}
continue continue
} }
klog.InfoS("Regenerating TopologyHints for resource already allocated to pod", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name)
// Generate TopologyHints for this resource given the current deviceHints[resource] = m.generateDeviceTopologyHints(resource, allocated, sets.String{}, requested)
// request size and the list of available devices. continue
deviceHints[resource] = m.generateDeviceTopologyHints(resource, available, reusable, requested)
} }
// Get the list of available devices, for which TopologyHints should be generated.
available := m.getAvailableDevices(resource)
reusable := m.devicesToReuse[string(pod.UID)][resource]
if available.Union(reusable).Len() < requested {
klog.ErrorS(nil, "Unable to generate topology hints: requested number of devices unavailable", "resource", resource, "request", requested, "available", available.Union(reusable).Len())
deviceHints[resource] = []topologymanager.TopologyHint{}
continue
}
// Generate TopologyHints for this resource given the current
// request size and the list of available devices.
deviceHints[resource] = m.generateDeviceTopologyHints(resource, available, reusable, requested)
} }
return deviceHints return deviceHints
@ -96,7 +93,8 @@ func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymana
deviceHints := make(map[string][]topologymanager.TopologyHint) deviceHints := make(map[string][]topologymanager.TopologyHint)
accumulatedResourceRequests := m.getPodDeviceRequest(pod) accumulatedResourceRequests := m.getPodDeviceRequest(pod)
m.mutex.Lock()
defer m.mutex.Unlock()
for resource, requested := range accumulatedResourceRequests { for resource, requested := range accumulatedResourceRequests {
// Only consider devices that actually contain topology information. // Only consider devices that actually contain topology information.
if aligned := m.deviceHasTopologyAlignment(resource); !aligned { if aligned := m.deviceHasTopologyAlignment(resource); !aligned {
@ -136,7 +134,7 @@ func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymana
return deviceHints return deviceHints
} }
func (m *ManagerImpl) deviceHasTopologyAlignmentLocked(resource string) bool { func (m *ManagerImpl) deviceHasTopologyAlignment(resource string) bool {
// If any device has Topology NUMANodes available, we assume they care about alignment. // If any device has Topology NUMANodes available, we assume they care about alignment.
for _, device := range m.allDevices[resource] { for _, device := range m.allDevices[resource] {
if device.Topology != nil && len(device.Topology.Nodes) > 0 { if device.Topology != nil && len(device.Topology.Nodes) > 0 {
@ -146,22 +144,12 @@ func (m *ManagerImpl) deviceHasTopologyAlignmentLocked(resource string) bool {
return false return false
} }
func (m *ManagerImpl) deviceHasTopologyAlignment(resource string) bool {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.deviceHasTopologyAlignmentLocked(resource)
}
func (m *ManagerImpl) getAvailableDevices(resource string) sets.String { func (m *ManagerImpl) getAvailableDevices(resource string) sets.String {
m.mutex.Lock()
defer m.mutex.Unlock()
// Strip all devices in use from the list of healthy ones. // Strip all devices in use from the list of healthy ones.
return m.healthyDevices[resource].Difference(m.allocatedDevices[resource]) return m.healthyDevices[resource].Difference(m.allocatedDevices[resource])
} }
func (m *ManagerImpl) generateDeviceTopologyHints(resource string, available sets.String, reusable sets.String, request int) []topologymanager.TopologyHint { func (m *ManagerImpl) generateDeviceTopologyHints(resource string, available sets.String, reusable sets.String, request int) []topologymanager.TopologyHint {
m.mutex.Lock()
defer m.mutex.Unlock()
// Initialize minAffinitySize to include all NUMA Nodes // Initialize minAffinitySize to include all NUMA Nodes
minAffinitySize := len(m.numaNodes) minAffinitySize := len(m.numaNodes)
@ -307,3 +295,16 @@ func (m *ManagerImpl) getPodDeviceRequest(pod *v1.Pod) map[string]int {
return podRequests return podRequests
} }
func (m *ManagerImpl) getContainerDeviceRequest(container *v1.Container) map[string]int {
containerRequests := make(map[string]int)
for resourceObj, requestedObj := range container.Resources.Limits {
resource := string(resourceObj)
requested := int(requestedObj.Value())
if !m.isDevicePluginResource(resource) {
continue
}
containerRequests[resource] = requested
}
return containerRequests
}

View File

@ -456,7 +456,7 @@ func TestTopologyAlignedAllocation(t *testing.T) {
} }
alignment := make(map[int]int) alignment := make(map[int]int)
if m.deviceHasTopologyAlignmentLocked(tc.resource) { if m.deviceHasTopologyAlignment(tc.resource) {
for d := range allocated { for d := range allocated {
if m.allDevices[tc.resource][d].Topology != nil { if m.allDevices[tc.resource][d].Topology != nil {
alignment[int(m.allDevices[tc.resource][d].Topology.Nodes[0].ID)]++ alignment[int(m.allDevices[tc.resource][d].Topology.Nodes[0].ID)]++