From 4ff554ba835597d055336d6e74f48949cb240a98 Mon Sep 17 00:00:00 2001 From: Abdullah Gharaibeh Date: Mon, 18 May 2020 21:15:09 -0400 Subject: [PATCH] Eliminate locking in (anti)affinity calculations --- .../plugins/interpodaffinity/filtering.go | 89 ++++++++----------- .../plugins/interpodaffinity/plugin.go | 2 - .../plugins/interpodaffinity/scoring.go | 11 ++- 3 files changed, 43 insertions(+), 59 deletions(-) diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go b/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go index 4a2d93c4037..1a1bf641b93 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go @@ -19,7 +19,7 @@ package interpodaffinity import ( "context" "fmt" - "sync" + "sync/atomic" v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" @@ -128,7 +128,7 @@ func (m topologyToMatchedTermCount) updateWithAffinityTerms(targetPod *v1.Pod, t } } -// updateAntiAffinityTerms updates the topologyToMatchedTermCount map with the specified value +// updateWithAntiAffinityTerms updates the topologyToMatchedTermCount map with the specified value // for each anti-affinity term matched the target pod. func (m topologyToMatchedTermCount) updateWithAntiAffinityTerms(targetPod *v1.Pod, targetPodNode *v1.Node, antiAffinityTerms []framework.AffinityTerm, value int64) { // Check anti-affinity terms. @@ -160,35 +160,12 @@ func podMatchesAllAffinityTerms(pod *v1.Pod, terms []framework.AffinityTerm) boo return true } -// getMatchingAntiAffinityTopologyPairs calculates the following for "existingPod" on given node: -// (1) Whether it has PodAntiAffinity -// (2) Whether ANY AffinityTerm matches the incoming pod -func getMatchingAntiAffinityTopologyPairsOfPod(newPod *v1.Pod, existingPod *framework.PodInfo, node *v1.Node) topologyToMatchedTermCount { - topologyMap := make(topologyToMatchedTermCount) - for _, term := range existingPod.RequiredAntiAffinityTerms { - if schedutil.PodMatchesTermsNamespaceAndSelector(newPod, term.Namespaces, term.Selector) { - if topologyValue, ok := node.Labels[term.TopologyKey]; ok { - pair := topologyPair{key: term.TopologyKey, value: topologyValue} - topologyMap[pair]++ - } - } - } - return topologyMap -} - // getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node: // (1) Whether it has PodAntiAffinity // (2) Whether any AffinityTerm matches the incoming pod func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.NodeInfo) topologyToMatchedTermCount { - var lock sync.Mutex - topologyMap := make(topologyToMatchedTermCount) - - appendResult := func(toAppend topologyToMatchedTermCount) { - lock.Lock() - defer lock.Unlock() - topologyMap.append(toAppend) - } - + topoMaps := make([]topologyToMatchedTermCount, len(allNodes)) + index := int32(-1) processNode := func(i int) { nodeInfo := allNodes[i] node := nodeInfo.Node() @@ -196,16 +173,22 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.Nod klog.Error("node not found") return } + topoMap := make(topologyToMatchedTermCount) for _, existingPod := range nodeInfo.PodsWithAffinity { - existingPodTopologyMaps := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod, node) - if len(existingPodTopologyMaps) != 0 { - appendResult(existingPodTopologyMaps) - } + topoMap.updateWithAntiAffinityTerms(pod, node, existingPod.RequiredAntiAffinityTerms, 1) + } + if len(topoMap) != 0 { + topoMaps[atomic.AddInt32(&index, 1)] = topoMap } } parallelize.Until(context.Background(), len(allNodes), processNode) - return topologyMap + result := make(topologyToMatchedTermCount) + for i := 0; i <= int(index); i++ { + result.append(topoMaps[i]) + } + + return result } // getTPMapMatchingIncomingAffinityAntiAffinity finds existing Pods that match affinity terms of the given "pod". @@ -213,24 +196,15 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.Nod // predicate. With this topologyToMatchedTermCount available, the affinity predicate does not // need to check all the pods in the cluster. func getTPMapMatchingIncomingAffinityAntiAffinity(podInfo *framework.PodInfo, allNodes []*framework.NodeInfo) (topologyToMatchedTermCount, topologyToMatchedTermCount) { - topologyPairsAffinityPodsMap := make(topologyToMatchedTermCount) - topologyToMatchedExistingAntiAffinityTerms := make(topologyToMatchedTermCount) + affinityCounts := make(topologyToMatchedTermCount) + antiAffinityCounts := make(topologyToMatchedTermCount) if len(podInfo.RequiredAffinityTerms) == 0 && len(podInfo.RequiredAntiAffinityTerms) == 0 { - return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms - } - - var lock sync.Mutex - appendResult := func(nodeName string, nodeTopologyPairsAffinityPodsMap, nodeTopologyPairsAntiAffinityPodsMap topologyToMatchedTermCount) { - lock.Lock() - defer lock.Unlock() - if len(nodeTopologyPairsAffinityPodsMap) > 0 { - topologyPairsAffinityPodsMap.append(nodeTopologyPairsAffinityPodsMap) - } - if len(nodeTopologyPairsAntiAffinityPodsMap) > 0 { - topologyToMatchedExistingAntiAffinityTerms.append(nodeTopologyPairsAntiAffinityPodsMap) - } + return affinityCounts, antiAffinityCounts } + affinityCountsList := make([]topologyToMatchedTermCount, len(allNodes)) + antiAffinityCountsList := make([]topologyToMatchedTermCount, len(allNodes)) + index := int32(-1) processNode := func(i int) { nodeInfo := allNodes[i] node := nodeInfo.Node() @@ -238,23 +212,30 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(podInfo *framework.PodInfo, al klog.Error("node not found") return } - nodeTopologyPairsAffinityPodsMap := make(topologyToMatchedTermCount) - nodeTopologyPairsAntiAffinityPodsMap := make(topologyToMatchedTermCount) + affinity := make(topologyToMatchedTermCount) + antiAffinity := make(topologyToMatchedTermCount) for _, existingPod := range nodeInfo.Pods { // Check affinity terms. - nodeTopologyPairsAffinityPodsMap.updateWithAffinityTerms(existingPod.Pod, node, podInfo.RequiredAffinityTerms, 1) + affinity.updateWithAffinityTerms(existingPod.Pod, node, podInfo.RequiredAffinityTerms, 1) // Check anti-affinity terms. - nodeTopologyPairsAntiAffinityPodsMap.updateWithAntiAffinityTerms(existingPod.Pod, node, podInfo.RequiredAntiAffinityTerms, 1) + antiAffinity.updateWithAntiAffinityTerms(existingPod.Pod, node, podInfo.RequiredAntiAffinityTerms, 1) } - if len(nodeTopologyPairsAffinityPodsMap) > 0 || len(nodeTopologyPairsAntiAffinityPodsMap) > 0 { - appendResult(node.Name, nodeTopologyPairsAffinityPodsMap, nodeTopologyPairsAntiAffinityPodsMap) + if len(affinity) > 0 || len(antiAffinity) > 0 { + k := atomic.AddInt32(&index, 1) + affinityCountsList[k] = affinity + antiAffinityCountsList[k] = antiAffinity } } parallelize.Until(context.Background(), len(allNodes), processNode) - return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms + for i := 0; i <= int(index); i++ { + affinityCounts.append(affinityCountsList[i]) + antiAffinityCounts.append(antiAffinityCountsList[i]) + } + + return affinityCounts, antiAffinityCounts } // PreFilter invoked at the prefilter extension point. diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go b/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go index 0d7924c5e19..69e833e2cef 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go @@ -18,7 +18,6 @@ package interpodaffinity import ( "fmt" - "sync" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/validation/field" @@ -45,7 +44,6 @@ var _ framework.ScorePlugin = &InterPodAffinity{} type InterPodAffinity struct { args config.InterPodAffinityArgs sharedLister framework.SharedLister - sync.Mutex } // Name returns name of the plugin. It is used in logs, etc. diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go b/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go index ab1474f2e6f..a32f5706998 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go @@ -19,6 +19,7 @@ package interpodaffinity import ( "context" "fmt" + "sync/atomic" v1 "k8s.io/api/core/v1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" @@ -164,6 +165,8 @@ func (pl *InterPodAffinity) PreScore( podInfo: framework.NewPodInfo(pod), } + topoScores := make([]scoreMap, len(allNodes)) + index := int32(-1) processNode := func(i int) { nodeInfo := allNodes[i] if nodeInfo.Node() == nil { @@ -182,13 +185,15 @@ func (pl *InterPodAffinity) PreScore( pl.processExistingPod(state, existingPod, nodeInfo, pod, topoScore) } if len(topoScore) > 0 { - pl.Lock() - state.topologyScore.append(topoScore) - pl.Unlock() + topoScores[atomic.AddInt32(&index, 1)] = topoScore } } parallelize.Until(context.Background(), len(allNodes), processNode) + for i := 0; i <= int(index); i++ { + state.topologyScore.append(topoScores[i]) + } + cycleState.Write(preScoreStateKey, state) return nil }