Merge pull request #91229 from ahg-g/ahg-affinity3

Eliminate locking in (anti)affinity calculations
This commit is contained in:
Kubernetes Prow Robot 2020-05-19 16:32:42 -07:00 committed by GitHub
commit 0746f165bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 43 additions and 59 deletions

View File

@ -19,7 +19,7 @@ package interpodaffinity
import ( import (
"context" "context"
"fmt" "fmt"
"sync" "sync/atomic"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -128,7 +128,7 @@ func (m topologyToMatchedTermCount) updateWithAffinityTerms(targetPod *v1.Pod, t
} }
} }
// updateAntiAffinityTerms updates the topologyToMatchedTermCount map with the specified value // updateWithAntiAffinityTerms updates the topologyToMatchedTermCount map with the specified value
// for each anti-affinity term matched the target pod. // for each anti-affinity term matched the target pod.
func (m topologyToMatchedTermCount) updateWithAntiAffinityTerms(targetPod *v1.Pod, targetPodNode *v1.Node, antiAffinityTerms []framework.AffinityTerm, value int64) { func (m topologyToMatchedTermCount) updateWithAntiAffinityTerms(targetPod *v1.Pod, targetPodNode *v1.Node, antiAffinityTerms []framework.AffinityTerm, value int64) {
// Check anti-affinity terms. // Check anti-affinity terms.
@ -160,35 +160,12 @@ func podMatchesAllAffinityTerms(pod *v1.Pod, terms []framework.AffinityTerm) boo
return true return true
} }
// getMatchingAntiAffinityTopologyPairs calculates the following for "existingPod" on given node:
// (1) Whether it has PodAntiAffinity
// (2) Whether ANY AffinityTerm matches the incoming pod
func getMatchingAntiAffinityTopologyPairsOfPod(newPod *v1.Pod, existingPod *framework.PodInfo, node *v1.Node) topologyToMatchedTermCount {
topologyMap := make(topologyToMatchedTermCount)
for _, term := range existingPod.RequiredAntiAffinityTerms {
if schedutil.PodMatchesTermsNamespaceAndSelector(newPod, term.Namespaces, term.Selector) {
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
topologyMap[pair]++
}
}
}
return topologyMap
}
// getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node: // getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node:
// (1) Whether it has PodAntiAffinity // (1) Whether it has PodAntiAffinity
// (2) Whether any AffinityTerm matches the incoming pod // (2) Whether any AffinityTerm matches the incoming pod
func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.NodeInfo) topologyToMatchedTermCount { func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.NodeInfo) topologyToMatchedTermCount {
var lock sync.Mutex topoMaps := make([]topologyToMatchedTermCount, len(allNodes))
topologyMap := make(topologyToMatchedTermCount) index := int32(-1)
appendResult := func(toAppend topologyToMatchedTermCount) {
lock.Lock()
defer lock.Unlock()
topologyMap.append(toAppend)
}
processNode := func(i int) { processNode := func(i int) {
nodeInfo := allNodes[i] nodeInfo := allNodes[i]
node := nodeInfo.Node() node := nodeInfo.Node()
@ -196,16 +173,22 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.Nod
klog.Error("node not found") klog.Error("node not found")
return return
} }
topoMap := make(topologyToMatchedTermCount)
for _, existingPod := range nodeInfo.PodsWithAffinity { for _, existingPod := range nodeInfo.PodsWithAffinity {
existingPodTopologyMaps := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod, node) topoMap.updateWithAntiAffinityTerms(pod, node, existingPod.RequiredAntiAffinityTerms, 1)
if len(existingPodTopologyMaps) != 0 { }
appendResult(existingPodTopologyMaps) if len(topoMap) != 0 {
} topoMaps[atomic.AddInt32(&index, 1)] = topoMap
} }
} }
parallelize.Until(context.Background(), len(allNodes), processNode) parallelize.Until(context.Background(), len(allNodes), processNode)
return topologyMap result := make(topologyToMatchedTermCount)
for i := 0; i <= int(index); i++ {
result.append(topoMaps[i])
}
return result
} }
// getTPMapMatchingIncomingAffinityAntiAffinity finds existing Pods that match affinity terms of the given "pod". // getTPMapMatchingIncomingAffinityAntiAffinity finds existing Pods that match affinity terms of the given "pod".
@ -213,24 +196,15 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.Nod
// predicate. With this topologyToMatchedTermCount available, the affinity predicate does not // predicate. With this topologyToMatchedTermCount available, the affinity predicate does not
// need to check all the pods in the cluster. // need to check all the pods in the cluster.
func getTPMapMatchingIncomingAffinityAntiAffinity(podInfo *framework.PodInfo, allNodes []*framework.NodeInfo) (topologyToMatchedTermCount, topologyToMatchedTermCount) { func getTPMapMatchingIncomingAffinityAntiAffinity(podInfo *framework.PodInfo, allNodes []*framework.NodeInfo) (topologyToMatchedTermCount, topologyToMatchedTermCount) {
topologyPairsAffinityPodsMap := make(topologyToMatchedTermCount) affinityCounts := make(topologyToMatchedTermCount)
topologyToMatchedExistingAntiAffinityTerms := make(topologyToMatchedTermCount) antiAffinityCounts := make(topologyToMatchedTermCount)
if len(podInfo.RequiredAffinityTerms) == 0 && len(podInfo.RequiredAntiAffinityTerms) == 0 { if len(podInfo.RequiredAffinityTerms) == 0 && len(podInfo.RequiredAntiAffinityTerms) == 0 {
return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms return affinityCounts, antiAffinityCounts
}
var lock sync.Mutex
appendResult := func(nodeName string, nodeTopologyPairsAffinityPodsMap, nodeTopologyPairsAntiAffinityPodsMap topologyToMatchedTermCount) {
lock.Lock()
defer lock.Unlock()
if len(nodeTopologyPairsAffinityPodsMap) > 0 {
topologyPairsAffinityPodsMap.append(nodeTopologyPairsAffinityPodsMap)
}
if len(nodeTopologyPairsAntiAffinityPodsMap) > 0 {
topologyToMatchedExistingAntiAffinityTerms.append(nodeTopologyPairsAntiAffinityPodsMap)
}
} }
affinityCountsList := make([]topologyToMatchedTermCount, len(allNodes))
antiAffinityCountsList := make([]topologyToMatchedTermCount, len(allNodes))
index := int32(-1)
processNode := func(i int) { processNode := func(i int) {
nodeInfo := allNodes[i] nodeInfo := allNodes[i]
node := nodeInfo.Node() node := nodeInfo.Node()
@ -238,23 +212,30 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(podInfo *framework.PodInfo, al
klog.Error("node not found") klog.Error("node not found")
return return
} }
nodeTopologyPairsAffinityPodsMap := make(topologyToMatchedTermCount) affinity := make(topologyToMatchedTermCount)
nodeTopologyPairsAntiAffinityPodsMap := make(topologyToMatchedTermCount) antiAffinity := make(topologyToMatchedTermCount)
for _, existingPod := range nodeInfo.Pods { for _, existingPod := range nodeInfo.Pods {
// Check affinity terms. // Check affinity terms.
nodeTopologyPairsAffinityPodsMap.updateWithAffinityTerms(existingPod.Pod, node, podInfo.RequiredAffinityTerms, 1) affinity.updateWithAffinityTerms(existingPod.Pod, node, podInfo.RequiredAffinityTerms, 1)
// Check anti-affinity terms. // Check anti-affinity terms.
nodeTopologyPairsAntiAffinityPodsMap.updateWithAntiAffinityTerms(existingPod.Pod, node, podInfo.RequiredAntiAffinityTerms, 1) antiAffinity.updateWithAntiAffinityTerms(existingPod.Pod, node, podInfo.RequiredAntiAffinityTerms, 1)
} }
if len(nodeTopologyPairsAffinityPodsMap) > 0 || len(nodeTopologyPairsAntiAffinityPodsMap) > 0 { if len(affinity) > 0 || len(antiAffinity) > 0 {
appendResult(node.Name, nodeTopologyPairsAffinityPodsMap, nodeTopologyPairsAntiAffinityPodsMap) k := atomic.AddInt32(&index, 1)
affinityCountsList[k] = affinity
antiAffinityCountsList[k] = antiAffinity
} }
} }
parallelize.Until(context.Background(), len(allNodes), processNode) parallelize.Until(context.Background(), len(allNodes), processNode)
return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms for i := 0; i <= int(index); i++ {
affinityCounts.append(affinityCountsList[i])
antiAffinityCounts.append(antiAffinityCountsList[i])
}
return affinityCounts, antiAffinityCounts
} }
// PreFilter invoked at the prefilter extension point. // PreFilter invoked at the prefilter extension point.

View File

@ -18,7 +18,6 @@ package interpodaffinity
import ( import (
"fmt" "fmt"
"sync"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/util/validation/field"
@ -45,7 +44,6 @@ var _ framework.ScorePlugin = &InterPodAffinity{}
type InterPodAffinity struct { type InterPodAffinity struct {
args config.InterPodAffinityArgs args config.InterPodAffinityArgs
sharedLister framework.SharedLister sharedLister framework.SharedLister
sync.Mutex
} }
// Name returns name of the plugin. It is used in logs, etc. // Name returns name of the plugin. It is used in logs, etc.

View File

@ -19,6 +19,7 @@ package interpodaffinity
import ( import (
"context" "context"
"fmt" "fmt"
"sync/atomic"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
@ -164,6 +165,8 @@ func (pl *InterPodAffinity) PreScore(
podInfo: framework.NewPodInfo(pod), podInfo: framework.NewPodInfo(pod),
} }
topoScores := make([]scoreMap, len(allNodes))
index := int32(-1)
processNode := func(i int) { processNode := func(i int) {
nodeInfo := allNodes[i] nodeInfo := allNodes[i]
if nodeInfo.Node() == nil { if nodeInfo.Node() == nil {
@ -182,13 +185,15 @@ func (pl *InterPodAffinity) PreScore(
pl.processExistingPod(state, existingPod, nodeInfo, pod, topoScore) pl.processExistingPod(state, existingPod, nodeInfo, pod, topoScore)
} }
if len(topoScore) > 0 { if len(topoScore) > 0 {
pl.Lock() topoScores[atomic.AddInt32(&index, 1)] = topoScore
state.topologyScore.append(topoScore)
pl.Unlock()
} }
} }
parallelize.Until(context.Background(), len(allNodes), processNode) parallelize.Until(context.Background(), len(allNodes), processNode)
for i := 0; i <= int(index); i++ {
state.topologyScore.append(topoScores[i])
}
cycleState.Write(preScoreStateKey, state) cycleState.Write(preScoreStateKey, state)
return nil return nil
} }