Improve topologyToMatchedTermCount map in InterPodAffinity PreFilter

This commit is contained in:
Maciej Skoczeń 2024-12-10 13:11:02 +00:00
parent 8770bd58d0
commit 2d82687114

View File

@ -89,15 +89,21 @@ type topologyPair struct {
} }
type topologyToMatchedTermCount map[topologyPair]int64 type topologyToMatchedTermCount map[topologyPair]int64
func (m topologyToMatchedTermCount) append(toAppend topologyToMatchedTermCount) { func (m topologyToMatchedTermCount) merge(toMerge topologyToMatchedTermCount) {
for pair := range toAppend { for pair, count := range toMerge {
m[pair] += toAppend[pair] m[pair] += count
}
}
func (m topologyToMatchedTermCount) mergeWithList(toMerge topologyToMatchedTermCountList) {
for _, tmtc := range toMerge {
m[tmtc.topologyPair] += tmtc.count
} }
} }
func (m topologyToMatchedTermCount) clone() topologyToMatchedTermCount { func (m topologyToMatchedTermCount) clone() topologyToMatchedTermCount {
copy := make(topologyToMatchedTermCount, len(m)) copy := make(topologyToMatchedTermCount, len(m))
copy.append(m) copy.merge(m)
return copy return copy
} }
@ -134,6 +140,48 @@ func (m topologyToMatchedTermCount) updateWithAntiAffinityTerms(terms []framewor
} }
} }
// topologyToMatchedTermCountList is a slice equivalent of topologyToMatchedTermCount map.
// The use of slice improves the performance of PreFilter,
// especially due to faster iteration when merging than with topologyToMatchedTermCount.
type topologyToMatchedTermCountList []topologyPairCount
type topologyPairCount struct {
topologyPair topologyPair
count int64
}
func (m *topologyToMatchedTermCountList) append(node *v1.Node, tk string, value int64) {
if tv, ok := node.Labels[tk]; ok {
pair := topologyPair{key: tk, value: tv}
*m = append(*m, topologyPairCount{
topologyPair: pair,
count: value,
})
}
}
// appends the specified value to the topologyToMatchedTermCountList
// for each affinity term if "targetPod" matches ALL terms.
func (m *topologyToMatchedTermCountList) appendWithAffinityTerms(
terms []framework.AffinityTerm, pod *v1.Pod, node *v1.Node, value int64) {
if podMatchesAllAffinityTerms(terms, pod) {
for _, t := range terms {
m.append(node, t.TopologyKey, value)
}
}
}
// appends the specified value to the topologyToMatchedTermCountList
// for each anti-affinity term matched the target pod.
func (m *topologyToMatchedTermCountList) appendWithAntiAffinityTerms(terms []framework.AffinityTerm, pod *v1.Pod, nsLabels labels.Set, node *v1.Node, value int64) {
// Check anti-affinity terms.
for _, t := range terms {
if t.Matches(pod, nsLabels) {
m.append(node, t.TopologyKey, value)
}
}
}
// returns true IFF the given pod matches all the given terms. // returns true IFF the given pod matches all the given terms.
func podMatchesAllAffinityTerms(terms []framework.AffinityTerm, pod *v1.Pod) bool { func podMatchesAllAffinityTerms(terms []framework.AffinityTerm, pod *v1.Pod) bool {
if len(terms) == 0 { if len(terms) == 0 {
@ -153,25 +201,26 @@ func podMatchesAllAffinityTerms(terms []framework.AffinityTerm, pod *v1.Pod) boo
// 1. Whether it has PodAntiAffinity // 1. Whether it has PodAntiAffinity
// 2. Whether any AntiAffinityTerm matches the incoming pod // 2. Whether any AntiAffinityTerm matches the incoming pod
func (pl *InterPodAffinity) getExistingAntiAffinityCounts(ctx context.Context, pod *v1.Pod, nsLabels labels.Set, nodes []*framework.NodeInfo) topologyToMatchedTermCount { func (pl *InterPodAffinity) getExistingAntiAffinityCounts(ctx context.Context, pod *v1.Pod, nsLabels labels.Set, nodes []*framework.NodeInfo) topologyToMatchedTermCount {
topoMaps := make([]topologyToMatchedTermCount, len(nodes)) antiAffinityCountsList := make([]topologyToMatchedTermCountList, len(nodes))
index := int32(-1) index := int32(-1)
processNode := func(i int) { processNode := func(i int) {
nodeInfo := nodes[i] nodeInfo := nodes[i]
node := nodeInfo.Node() node := nodeInfo.Node()
topoMap := make(topologyToMatchedTermCount) antiAffinityCounts := make(topologyToMatchedTermCountList, 0)
for _, existingPod := range nodeInfo.PodsWithRequiredAntiAffinity { for _, existingPod := range nodeInfo.PodsWithRequiredAntiAffinity {
topoMap.updateWithAntiAffinityTerms(existingPod.RequiredAntiAffinityTerms, pod, nsLabels, node, 1) antiAffinityCounts.appendWithAntiAffinityTerms(existingPod.RequiredAntiAffinityTerms, pod, nsLabels, node, 1)
} }
if len(topoMap) != 0 { if len(antiAffinityCounts) != 0 {
topoMaps[atomic.AddInt32(&index, 1)] = topoMap antiAffinityCountsList[atomic.AddInt32(&index, 1)] = antiAffinityCounts
} }
} }
pl.parallelizer.Until(ctx, len(nodes), processNode, pl.Name()) pl.parallelizer.Until(ctx, len(nodes), processNode, pl.Name())
result := make(topologyToMatchedTermCount) result := make(topologyToMatchedTermCount)
// Traditional for loop is slightly faster in this case than its "for range" equivalent.
for i := 0; i <= int(index); i++ { for i := 0; i <= int(index); i++ {
result.append(topoMaps[i]) result.mergeWithList(antiAffinityCountsList[i])
} }
return result return result
@ -188,20 +237,20 @@ func (pl *InterPodAffinity) getIncomingAffinityAntiAffinityCounts(ctx context.Co
return affinityCounts, antiAffinityCounts return affinityCounts, antiAffinityCounts
} }
affinityCountsList := make([]topologyToMatchedTermCount, len(allNodes)) affinityCountsList := make([]topologyToMatchedTermCountList, len(allNodes))
antiAffinityCountsList := make([]topologyToMatchedTermCount, len(allNodes)) antiAffinityCountsList := make([]topologyToMatchedTermCountList, len(allNodes))
index := int32(-1) index := int32(-1)
processNode := func(i int) { processNode := func(i int) {
nodeInfo := allNodes[i] nodeInfo := allNodes[i]
node := nodeInfo.Node() node := nodeInfo.Node()
affinity := make(topologyToMatchedTermCount) affinity := make(topologyToMatchedTermCountList, 0)
antiAffinity := make(topologyToMatchedTermCount) antiAffinity := make(topologyToMatchedTermCountList, 0)
for _, existingPod := range nodeInfo.Pods { for _, existingPod := range nodeInfo.Pods {
affinity.updateWithAffinityTerms(podInfo.RequiredAffinityTerms, existingPod.Pod, node, 1) affinity.appendWithAffinityTerms(podInfo.RequiredAffinityTerms, existingPod.Pod, node, 1)
// The incoming pod's terms have the namespaceSelector merged into the namespaces, and so // The incoming pod's terms have the namespaceSelector merged into the namespaces, and so
// here we don't lookup the existing pod's namespace labels, hence passing nil for nsLabels. // here we don't lookup the existing pod's namespace labels, hence passing nil for nsLabels.
antiAffinity.updateWithAntiAffinityTerms(podInfo.RequiredAntiAffinityTerms, existingPod.Pod, nil, node, 1) antiAffinity.appendWithAntiAffinityTerms(podInfo.RequiredAntiAffinityTerms, existingPod.Pod, nil, node, 1)
} }
if len(affinity) > 0 || len(antiAffinity) > 0 { if len(affinity) > 0 || len(antiAffinity) > 0 {
@ -213,8 +262,8 @@ func (pl *InterPodAffinity) getIncomingAffinityAntiAffinityCounts(ctx context.Co
pl.parallelizer.Until(ctx, len(allNodes), processNode, pl.Name()) pl.parallelizer.Until(ctx, len(allNodes), processNode, pl.Name())
for i := 0; i <= int(index); i++ { for i := 0; i <= int(index); i++ {
affinityCounts.append(affinityCountsList[i]) affinityCounts.mergeWithList(affinityCountsList[i])
antiAffinityCounts.append(antiAffinityCountsList[i]) antiAffinityCounts.mergeWithList(antiAffinityCountsList[i])
} }
return affinityCounts, antiAffinityCounts return affinityCounts, antiAffinityCounts