From d0dc178ab8ed1ecf6b0160ea637be380c1b6c925 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Mon, 16 Mar 2020 17:55:21 -0400 Subject: [PATCH] Reduce locking when calculating affinity scores Signed-off-by: Aldo Culquicondor --- .../plugins/interpodaffinity/scoring.go | 53 ++++++++++++------- 1 file changed, 35 insertions(+), 18 deletions(-) diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go b/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go index fe80c1a3872..7699ea72f22 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go @@ -32,9 +32,11 @@ import ( // preScoreStateKey is the key in CycleState to InterPodAffinity pre-computed data for Scoring. const preScoreStateKey = "PreScore" + Name +type scoreMap map[string]map[string]int64 + // preScoreState computed at PreScore and used at Score. type preScoreState struct { - topologyScore map[string]map[string]int64 + topologyScore scoreMap affinityTerms []*weightedAffinityTerm antiAffinityTerms []*weightedAffinityTerm } @@ -76,8 +78,7 @@ func getWeightedAffinityTerms(pod *v1.Pod, v1Terms []v1.WeightedPodAffinityTerm) return terms, nil } -func (pl *InterPodAffinity) processTerm( - state *preScoreState, +func (m scoreMap) processTerm( term *weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, @@ -90,24 +91,34 @@ func (pl *InterPodAffinity) processTerm( match := schedutil.PodMatchesTermsNamespaceAndSelector(podToCheck, term.namespaces, term.selector) tpValue, tpValueExist := fixedNode.Labels[term.topologyKey] if match && tpValueExist { - pl.Lock() - if state.topologyScore[term.topologyKey] == nil { - state.topologyScore[term.topologyKey] = make(map[string]int64) + if m[term.topologyKey] == nil { + m[term.topologyKey] = make(map[string]int64) } - state.topologyScore[term.topologyKey][tpValue] += int64(term.weight * int32(multiplier)) - pl.Unlock() + m[term.topologyKey][tpValue] += int64(term.weight * int32(multiplier)) } return } -func (pl *InterPodAffinity) processTerms(state *preScoreState, terms []*weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error { +func (m scoreMap) processTerms(terms []*weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) { for _, term := range terms { - pl.processTerm(state, term, podToCheck, fixedNode, multiplier) + m.processTerm(term, podToCheck, fixedNode, multiplier) } - return nil } -func (pl *InterPodAffinity) processExistingPod(state *preScoreState, existingPod *v1.Pod, existingPodNodeInfo *nodeinfo.NodeInfo, incomingPod *v1.Pod) error { +func (m scoreMap) append(other scoreMap) { + for topology, oScores := range other { + scores := m[topology] + if scores == nil { + m[topology] = oScores + continue + } + for k, v := range oScores { + scores[k] += v + } + } +} + +func (pl *InterPodAffinity) processExistingPod(state *preScoreState, existingPod *v1.Pod, existingPodNodeInfo *nodeinfo.NodeInfo, incomingPod *v1.Pod, topoScore scoreMap) error { existingPodAffinity := existingPod.Spec.Affinity existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil @@ -116,12 +127,12 @@ func (pl *InterPodAffinity) processExistingPod(state *preScoreState, existingPod // For every soft pod affinity term of , if matches the term, // increment for every node in the cluster with the same // value as that of `s node by the term`s weight. - pl.processTerms(state, state.affinityTerms, existingPod, existingPodNode, 1) + topoScore.processTerms(state.affinityTerms, existingPod, existingPodNode, 1) // For every soft pod anti-affinity term of , if matches the term, // decrement for every node in the cluster with the same // value as that of `s node by the term`s weight. - pl.processTerms(state, state.antiAffinityTerms, existingPod, existingPodNode, -1) + topoScore.processTerms(state.antiAffinityTerms, existingPod, existingPodNode, -1) if existingHasAffinityConstraints { // For every hard pod affinity term of , if matches the term, @@ -139,7 +150,7 @@ func (pl *InterPodAffinity) processExistingPod(state *preScoreState, existingPod if err != nil { return err } - pl.processTerm(state, processedTerm, incomingPod, existingPodNode, 1) + topoScore.processTerm(processedTerm, incomingPod, existingPodNode, 1) } } // For every soft pod affinity term of , if matches the term, @@ -151,7 +162,7 @@ func (pl *InterPodAffinity) processExistingPod(state *preScoreState, existingPod return nil } - pl.processTerms(state, terms, incomingPod, existingPodNode, 1) + topoScore.processTerms(terms, incomingPod, existingPodNode, 1) } if existingHasAntiAffinityConstraints { // For every soft pod anti-affinity term of , if matches the term, @@ -161,7 +172,7 @@ func (pl *InterPodAffinity) processExistingPod(state *preScoreState, existingPod if err != nil { return err } - pl.processTerms(state, terms, incomingPod, existingPodNode, -1) + topoScore.processTerms(terms, incomingPod, existingPodNode, -1) } return nil } @@ -235,12 +246,18 @@ func (pl *InterPodAffinity) PreScore( podsToProcess = nodeInfo.Pods() } + topoScore := make(scoreMap) for _, existingPod := range podsToProcess { - if err := pl.processExistingPod(state, existingPod, nodeInfo, pod); err != nil { + if err := pl.processExistingPod(state, existingPod, nodeInfo, pod, topoScore); err != nil { errCh.SendErrorWithCancel(err, cancel) return } } + if len(topoScore) > 0 { + pl.Lock() + state.topologyScore.append(topoScore) + pl.Unlock() + } } workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode) if err := errCh.ReceiveError(); err != nil {