Merge pull request #89487 from alculquicondor/per_node_spreading

Optimize preferred spreading for hostname topology
This commit is contained in:
Kubernetes Prow Robot 2020-03-30 08:21:54 -07:00 committed by GitHub
commit 59c66dab8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 50 additions and 35 deletions

View File

@ -26,6 +26,7 @@ go_test(
deps = [ deps = [
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/parallelize:go_default_library",
"//pkg/scheduler/testing:go_default_library", "//pkg/scheduler/testing:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library", "//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -25,6 +25,7 @@ import (
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
st "k8s.io/kubernetes/pkg/scheduler/testing" st "k8s.io/kubernetes/pkg/scheduler/testing"
) )
@ -76,15 +77,14 @@ func BenchmarkTestSelectorSpreadPriority(b *testing.B) {
if !status.IsSuccess() { if !status.IsSuccess() {
b.Fatalf("unexpected error: %v", status) b.Fatalf("unexpected error: %v", status)
} }
var gotList framework.NodeScoreList gotList := make(framework.NodeScoreList, len(filteredNodes))
for _, node := range filteredNodes { scoreNode := func(i int) {
score, status := plugin.Score(ctx, state, pod, node.Name) n := filteredNodes[i]
if !status.IsSuccess() { score, _ := plugin.Score(ctx, state, pod, n.Name)
b.Errorf("unexpected error: %v", status) gotList[i] = framework.NodeScore{Name: n.Name, Score: score}
} }
gotList = append(gotList, framework.NodeScore{Name: node.Name, Score: score}) parallelize.Until(ctx, len(filteredNodes), scoreNode)
} status = plugin.NormalizeScore(ctx, state, pod, gotList)
status = plugin.NormalizeScore(context.Background(), state, pod, gotList)
if !status.IsSuccess() { if !status.IsSuccess() {
b.Fatal(status) b.Fatal(status)
} }

View File

@ -41,6 +41,7 @@ go_test(
deps = [ deps = [
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/parallelize:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/testing:go_default_library", "//pkg/scheduler/testing:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library", "//staging/src/k8s.io/api/apps/v1:go_default_library",

View File

@ -82,3 +82,17 @@ func filterTopologySpreadConstraints(constraints []v1.TopologySpreadConstraint,
} }
return result, nil return result, nil
} }
func countPodsMatchSelector(pods []*v1.Pod, selector labels.Selector, ns string) int {
count := 0
for _, p := range pods {
// Bypass terminating Pod (see #87621).
if p.DeletionTimestamp != nil || p.Namespace != ns {
continue
}
if selector.Matches(labels.Set(p.Labels)) {
count++
}
}
return count
}

View File

@ -23,7 +23,6 @@ import (
"sync/atomic" "sync/atomic"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper" pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
@ -73,6 +72,10 @@ func (pl *PodTopologySpread) initPreScoreState(s *preScoreState, pod *v1.Pod, fi
continue continue
} }
for _, constraint := range s.Constraints { for _, constraint := range s.Constraints {
// per-node counts are calculated during Score.
if constraint.TopologyKey == v1.LabelHostname {
continue
}
pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]} pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
if s.TopologyPairToPodCounts[pair] == nil { if s.TopologyPairToPodCounts[pair] == nil {
s.TopologyPairToPodCounts[pair] = new(int64) s.TopologyPairToPodCounts[pair] = new(int64)
@ -103,7 +106,7 @@ func (pl *PodTopologySpread) PreScore(
} }
state := &preScoreState{ state := &preScoreState{
NodeNameSet: sets.String{}, NodeNameSet: make(sets.String, len(filteredNodes)),
TopologyPairToPodCounts: make(map[topologyPair]*int64), TopologyPairToPodCounts: make(map[topologyPair]*int64),
} }
err = pl.initPreScoreState(state, pod, filteredNodes) err = pl.initPreScoreState(state, pod, filteredNodes)
@ -134,22 +137,13 @@ func (pl *PodTopologySpread) PreScore(
pair := topologyPair{key: c.TopologyKey, value: node.Labels[c.TopologyKey]} pair := topologyPair{key: c.TopologyKey, value: node.Labels[c.TopologyKey]}
// If current topology pair is not associated with any candidate node, // If current topology pair is not associated with any candidate node,
// continue to avoid unnecessary calculation. // continue to avoid unnecessary calculation.
if state.TopologyPairToPodCounts[pair] == nil { // Per-node counts are also skipped, as they are done during Score.
tpCount := state.TopologyPairToPodCounts[pair]
if tpCount == nil {
continue continue
} }
count := countPodsMatchSelector(nodeInfo.Pods(), c.Selector, pod.Namespace)
// <matchSum> indicates how many pods (on current node) match the <constraint>. atomic.AddInt64(tpCount, int64(count))
matchSum := int64(0)
for _, existingPod := range nodeInfo.Pods() {
// Bypass terminating Pod (see #87621).
if existingPod.DeletionTimestamp != nil || existingPod.Namespace != pod.Namespace {
continue
}
if c.Selector.Matches(labels.Set(existingPod.Labels)) {
matchSum++
}
}
atomic.AddInt64(state.TopologyPairToPodCounts[pair], matchSum)
} }
} }
parallelize.Until(ctx, len(allNodes), processAllNode) parallelize.Until(ctx, len(allNodes), processAllNode)
@ -183,11 +177,16 @@ func (pl *PodTopologySpread) Score(ctx context.Context, cycleState *framework.Cy
var score int64 var score int64
for _, c := range s.Constraints { for _, c := range s.Constraints {
if tpVal, ok := node.Labels[c.TopologyKey]; ok { if tpVal, ok := node.Labels[c.TopologyKey]; ok {
if c.TopologyKey == v1.LabelHostname {
count := countPodsMatchSelector(nodeInfo.Pods(), c.Selector, pod.Namespace)
score += int64(count)
} else {
pair := topologyPair{key: c.TopologyKey, value: tpVal} pair := topologyPair{key: c.TopologyKey, value: tpVal}
matchSum := *s.TopologyPairToPodCounts[pair] matchSum := *s.TopologyPairToPodCounts[pair]
score += matchSum score += matchSum
} }
} }
}
return score, nil return score, nil
} }

View File

@ -29,6 +29,7 @@ import (
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
st "k8s.io/kubernetes/pkg/scheduler/testing" st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
) )
@ -746,19 +747,18 @@ func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
var gotList framework.NodeScoreList
status := p.PreScore(ctx, state, pod, filteredNodes) status := p.PreScore(ctx, state, pod, filteredNodes)
if !status.IsSuccess() { if !status.IsSuccess() {
b.Fatalf("unexpected error: %v", status) b.Fatalf("unexpected error: %v", status)
} }
for _, n := range filteredNodes { gotList := make(framework.NodeScoreList, len(filteredNodes))
score, status := p.Score(context.Background(), state, pod, n.Name) scoreNode := func(i int) {
if !status.IsSuccess() { n := filteredNodes[i]
b.Fatalf("unexpected error: %v", status) score, _ := p.Score(ctx, state, pod, n.Name)
gotList[i] = framework.NodeScore{Name: n.Name, Score: score}
} }
gotList = append(gotList, framework.NodeScore{Name: n.Name, Score: score}) parallelize.Until(ctx, len(filteredNodes), scoreNode)
} status = p.NormalizeScore(ctx, state, pod, gotList)
status = p.NormalizeScore(context.Background(), state, pod, gotList)
if !status.IsSuccess() { if !status.IsSuccess() {
b.Fatal(status) b.Fatal(status)
} }