From 90603728fbff4db0a934bb5180f4873d96e1b48a Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Wed, 30 Oct 2019 22:21:57 -0700 Subject: [PATCH] Migrate EvenPodsSpread Priority as Score plugin in map/reduce style --- .../algorithm/priorities/even_pods_spread.go | 210 ++++++++++-------- .../priorities/even_pods_spread_test.go | 84 ++++--- .../algorithm/priorities/metadata.go | 6 +- .../algorithmprovider/defaults/defaults.go | 7 +- .../api/compatibility/compatibility_test.go | 6 +- .../framework/plugins/default_registry.go | 9 +- .../framework/plugins/podtopologyspread/BUILD | 1 + .../podtopologyspread/pod_topology_spread.go | 38 +++- .../pod_topology_spread_test.go | 14 +- 9 files changed, 237 insertions(+), 138 deletions(-) diff --git a/pkg/scheduler/algorithm/priorities/even_pods_spread.go b/pkg/scheduler/algorithm/priorities/even_pods_spread.go index db3fe2e59f1..09d7391a298 100644 --- a/pkg/scheduler/algorithm/priorities/even_pods_spread.go +++ b/pkg/scheduler/algorithm/priorities/even_pods_spread.go @@ -18,6 +18,7 @@ package priorities import ( "context" + "fmt" "math" "sync/atomic" @@ -26,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" + schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedutil "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/klog" @@ -36,67 +38,32 @@ type topologyPair struct { value string } -type topologySpreadConstraintsMap struct { - // nodeNameToPodCounts is keyed with node name, and valued with the number of matching pods. - nodeNameToPodCounts map[string]int32 +type podTopologySpreadMap struct { + // nodeNameSet is a string set holding all node names which have all constraints[*].topologyKey present. + nodeNameSet map[string]struct{} // topologyPairToPodCounts is keyed with topologyPair, and valued with the number of matching pods. - topologyPairToPodCounts map[topologyPair]*int32 + topologyPairToPodCounts map[topologyPair]*int64 } -func newTopologySpreadConstraintsMap() *topologySpreadConstraintsMap { - return &topologySpreadConstraintsMap{ - nodeNameToPodCounts: make(map[string]int32), - topologyPairToPodCounts: make(map[topologyPair]*int32), +func newTopologySpreadConstraintsMap() *podTopologySpreadMap { + return &podTopologySpreadMap{ + nodeNameSet: make(map[string]struct{}), + topologyPairToPodCounts: make(map[topologyPair]*int64), } } -// Note: the passed in are the "filtered" nodes which have passed Predicates. -// This function iterates to filter out the nodes which don't have required topologyKey(s), -// and initialize two maps: -// 1) t.topologyPairToPodCounts: keyed with both eligible topology pair and node names. -// 2) t.nodeNameToPodCounts: keyed with node name, and valued with a *int32 pointer for eligible node only. -func (t *topologySpreadConstraintsMap) initialize(pod *v1.Pod, nodes []*v1.Node) { - constraints := getSoftTopologySpreadConstraints(pod) - for _, node := range nodes { - if !predicates.NodeLabelsMatchSpreadConstraints(node.Labels, constraints) { - continue - } - for _, constraint := range constraints { - pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]} - if t.topologyPairToPodCounts[pair] == nil { - t.topologyPairToPodCounts[pair] = new(int32) - } - } - t.nodeNameToPodCounts[node.Name] = 0 - // For those nodes which don't have all required topologyKeys present, it's intentional to keep - // those entries absent in nodeNameToPodCounts, so that we're able to score them to 0 afterwards. - } -} - -// CalculateEvenPodsSpreadPriority computes a score by checking through the topologySpreadConstraints -// that are with WhenUnsatisfiable=ScheduleAnyway (a.k.a soft constraint). -// The function works as below: -// 1) In all nodes, calculate the number of pods which match 's soft topology spread constraints. -// 2) Group the number calculated in 1) by topologyPair, and sum up to corresponding candidate nodes. -// 3) Finally normalize the number to 0~10. The node with the highest score is the most preferred. -// Note: Symmetry is not applicable. We only weigh how incomingPod matches existingPod. -// Whether existingPod matches incomingPod doesn't contribute to the final score. -// This is different from the Affinity API. -func CalculateEvenPodsSpreadPriority(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) { - result := make(framework.NodeScoreList, len(nodes)) +// buildPodTopologySpreadMap prepares necessary data (podTopologySpreadMap) for incoming pod on the filteredNodes. +// Later Priority function will use 'podTopologySpreadMap' to perform the Scoring calculations. +func buildPodTopologySpreadMap(pod *v1.Pod, filteredNodes []*v1.Node, allNodes []*schedulernodeinfo.NodeInfo) *podTopologySpreadMap { // return if incoming pod doesn't have soft topology spread constraints. constraints := getSoftTopologySpreadConstraints(pod) - if len(constraints) == 0 { - return result, nil + if len(constraints) == 0 || len(filteredNodes) == 0 || len(allNodes) == 0 { + return nil } - allNodes, err := sharedLister.NodeInfos().List() - if err != nil { - return nil, err - } - - t := newTopologySpreadConstraintsMap() - t.initialize(pod, nodes) + // initialize podTopologySpreadMap which will be used in Score plugin. + m := newTopologySpreadConstraintsMap() + m.initialize(pod, filteredNodes) errCh := schedutil.NewErrorChannel() ctx, cancel := context.WithCancel(context.Background()) @@ -117,12 +84,12 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, sharedLister schedulerlisters. pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]} // If current topology pair is not associated with any candidate node, // continue to avoid unnecessary calculation. - if t.topologyPairToPodCounts[pair] == nil { + if m.topologyPairToPodCounts[pair] == nil { continue } // indicates how many pods (on current node) match the . - matchSum := int32(0) + matchSum := int64(0) for _, existingPod := range nodeInfo.Pods() { match, err := predicates.PodMatchesSpreadConstraint(existingPod.Labels, constraint) if err != nil { @@ -133,67 +100,132 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, sharedLister schedulerlisters. matchSum++ } } - atomic.AddInt32(t.topologyPairToPodCounts[pair], matchSum) + atomic.AddInt64(m.topologyPairToPodCounts[pair], matchSum) } } workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processAllNode) if err := errCh.ReceiveError(); err != nil { - return nil, err + klog.Error(err) + return nil } - var minCount int32 = math.MaxInt32 - // sums up the number of matching pods on each qualified topology pair - var total int32 - for _, node := range nodes { - if _, ok := t.nodeNameToPodCounts[node.Name]; !ok { + return m +} + +// initialize iterates "filteredNodes" to filter out the nodes which don't have required topologyKey(s), +// and initialize two maps: +// 1) m.topologyPairToPodCounts: keyed with both eligible topology pair and node names. +// 2) m.nodeNameSet: keyed with node name, and valued with a *int64 pointer for eligible node only. +func (m *podTopologySpreadMap) initialize(pod *v1.Pod, filteredNodes []*v1.Node) { + constraints := getSoftTopologySpreadConstraints(pod) + for _, node := range filteredNodes { + if !predicates.NodeLabelsMatchSpreadConstraints(node.Labels, constraints) { continue } - - // For each present , current node gets a credit of . - // And we add to to reverse the final score later. for _, constraint := range constraints { - if tpVal, ok := node.Labels[constraint.TopologyKey]; ok { - pair := topologyPair{key: constraint.TopologyKey, value: tpVal} - matchSum := *t.topologyPairToPodCounts[pair] - t.nodeNameToPodCounts[node.Name] += matchSum - total += matchSum + pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]} + if m.topologyPairToPodCounts[pair] == nil { + m.topologyPairToPodCounts[pair] = new(int64) } } - if t.nodeNameToPodCounts[node.Name] < minCount { - minCount = t.nodeNameToPodCounts[node.Name] + m.nodeNameSet[node.Name] = struct{}{} + // For those nodes which don't have all required topologyKeys present, it's intentional to leave + // their entries absent in nodeNameSet, so that we're able to score them to 0 afterwards. + } +} + +// CalculateEvenPodsSpreadPriorityMap calculate the number of matching pods on the passed-in "node", +// and return the number as Score. +func CalculateEvenPodsSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { + node := nodeInfo.Node() + if node == nil { + return framework.NodeScore{}, fmt.Errorf("node not found") + } + + var m *podTopologySpreadMap + if priorityMeta, ok := meta.(*priorityMetadata); ok { + m = priorityMeta.podTopologySpreadMap + } + if m == nil { + return framework.NodeScore{}, nil + } + + // no need to continue if the node is not qualified. + if _, ok := m.nodeNameSet[node.Name]; !ok { + return framework.NodeScore{Name: node.Name, Score: 0}, nil + } + + constraints := getSoftTopologySpreadConstraints(pod) + // For each present , current node gets a credit of . + // And we sum up and return it as this node's score. + var score int64 + for _, constraint := range constraints { + if tpVal, ok := node.Labels[constraint.TopologyKey]; ok { + pair := topologyPair{key: constraint.TopologyKey, value: tpVal} + matchSum := *m.topologyPairToPodCounts[pair] + score += matchSum + } + } + return framework.NodeScore{Name: node.Name, Score: score}, nil +} + +// CalculateEvenPodsSpreadPriorityReduce normalizes the score for each filteredNode, +// The basic rule is: the bigger the score(matching number of pods) is, the smaller the +// final normalized score will be. +func CalculateEvenPodsSpreadPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, + result framework.NodeScoreList) error { + var m *podTopologySpreadMap + if priorityMeta, ok := meta.(*priorityMetadata); ok { + m = priorityMeta.podTopologySpreadMap + } + if m == nil { + return nil + } + + // Calculate the summed score and . + var minScore int64 = math.MaxInt64 + var total int64 + for _, score := range result { + // it's mandatory to check if is present in m.nodeNameSet + if _, ok := m.nodeNameSet[score.Name]; !ok { + continue + } + total += score.Score + if score.Score < minScore { + minScore = score.Score } } - // calculate final priority score for each node - // TODO(Huang-Wei): in alpha version, we keep the formula as simple as possible. - // current version ranks the nodes properly, but it doesn't take MaxSkew into - // consideration, we may come up with a better formula in the future. - maxMinDiff := total - minCount - for i := range nodes { - node := nodes[i] - result[i].Name = node.Name - - // debugging purpose: print the value for each node - // score must be pointer here, otherwise it's always 0 + maxMinDiff := total - minScore + for i := range result { + nodeInfo, err := sharedLister.NodeInfos().Get(result[i].Name) + if err != nil { + return err + } + node := nodeInfo.Node() + // Debugging purpose: print the score for each node. + // Score must be a pointer here, otherwise it's always 0. if klog.V(10) { defer func(score *int64, nodeName string) { - klog.Infof("%v -> %v: EvenPodsSpreadPriority, Score: (%d)", pod.Name, nodeName, *score) + klog.Infof("%v -> %v: PodTopologySpread NormalizeScore, Score: (%d)", pod.Name, nodeName, *score) }(&result[i].Score, node.Name) } - if _, ok := t.nodeNameToPodCounts[node.Name]; !ok { - result[i].Score = 0 - continue - } if maxMinDiff == 0 { result[i].Score = framework.MaxNodeScore continue } - fScore := float64(framework.MaxNodeScore) * (float64(total-t.nodeNameToPodCounts[node.Name]) / float64(maxMinDiff)) + + if _, ok := m.nodeNameSet[node.Name]; !ok { + result[i].Score = 0 + continue + } + + flippedScore := total - result[i].Score + fScore := float64(framework.MaxNodeScore) * (float64(flippedScore) / float64(maxMinDiff)) result[i].Score = int64(fScore) } - - return result, nil + return nil } // TODO(Huang-Wei): combine this with getHardTopologySpreadConstraints() in predicates package diff --git a/pkg/scheduler/algorithm/priorities/even_pods_spread_test.go b/pkg/scheduler/algorithm/priorities/even_pods_spread_test.go index 982ed407f27..24b3a2f1a70 100644 --- a/pkg/scheduler/algorithm/priorities/even_pods_spread_test.go +++ b/pkg/scheduler/algorithm/priorities/even_pods_spread_test.go @@ -26,13 +26,13 @@ import ( st "k8s.io/kubernetes/pkg/scheduler/testing" ) -func Test_topologySpreadConstraintsMap_initialize(t *testing.T) { +func Test_podTopologySpreadMap_initialize(t *testing.T) { tests := []struct { name string pod *v1.Pod nodes []*v1.Node - wantNodeNameMap map[string]int32 - wantTopologyPairMap map[topologyPair]*int32 + wantNodeNameSet map[string]struct{} + wantTopologyPairMap map[topologyPair]*int64 }{ { name: "normal case", @@ -45,17 +45,17 @@ func Test_topologySpreadConstraintsMap_initialize(t *testing.T) { st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(), st.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(), }, - wantNodeNameMap: map[string]int32{ - "node-a": 0, - "node-b": 0, - "node-x": 0, + wantNodeNameSet: map[string]struct{}{ + "node-a": {}, + "node-b": {}, + "node-x": {}, }, - wantTopologyPairMap: map[topologyPair]*int32{ - {key: "zone", value: "zone1"}: new(int32), - {key: "zone", value: "zone2"}: new(int32), - {key: "node", value: "node-a"}: new(int32), - {key: "node", value: "node-b"}: new(int32), - {key: "node", value: "node-x"}: new(int32), + wantTopologyPairMap: map[topologyPair]*int64{ + {key: "zone", value: "zone1"}: new(int64), + {key: "zone", value: "zone2"}: new(int64), + {key: "node", value: "node-a"}: new(int64), + {key: "node", value: "node-b"}: new(int64), + {key: "node", value: "node-x"}: new(int64), }, }, { @@ -69,26 +69,26 @@ func Test_topologySpreadConstraintsMap_initialize(t *testing.T) { st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(), st.MakeNode().Name("node-x").Label("node", "node-x").Obj(), }, - wantNodeNameMap: map[string]int32{ - "node-a": 0, - "node-b": 0, + wantNodeNameSet: map[string]struct{}{ + "node-a": {}, + "node-b": {}, }, - wantTopologyPairMap: map[topologyPair]*int32{ - {key: "zone", value: "zone1"}: new(int32), - {key: "node", value: "node-a"}: new(int32), - {key: "node", value: "node-b"}: new(int32), + wantTopologyPairMap: map[topologyPair]*int64{ + {key: "zone", value: "zone1"}: new(int64), + {key: "node", value: "node-a"}: new(int64), + {key: "node", value: "node-b"}: new(int64), }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tMap := newTopologySpreadConstraintsMap() - tMap.initialize(tt.pod, tt.nodes) - if !reflect.DeepEqual(tMap.nodeNameToPodCounts, tt.wantNodeNameMap) { - t.Errorf("initilize().nodeNameToPodCounts = %#v, want %#v", tMap.nodeNameToPodCounts, tt.wantNodeNameMap) + m := newTopologySpreadConstraintsMap() + m.initialize(tt.pod, tt.nodes) + if !reflect.DeepEqual(m.nodeNameSet, tt.wantNodeNameSet) { + t.Errorf("initilize().nodeNameSet = %#v, want %#v", m.nodeNameSet, tt.wantNodeNameSet) } - if !reflect.DeepEqual(tMap.topologyPairToPodCounts, tt.wantTopologyPairMap) { - t.Errorf("initilize().topologyPairToPodCounts = %#v, want %#v", tMap.topologyPairToPodCounts, tt.wantTopologyPairMap) + if !reflect.DeepEqual(m.topologyPairToPodCounts, tt.wantTopologyPairMap) { + t.Errorf("initilize().topologyPairToPodCounts = %#v, want %#v", m.topologyPairToPodCounts, tt.wantTopologyPairMap) } }) } @@ -435,9 +435,23 @@ func TestCalculateEvenPodsSpreadPriority(t *testing.T) { allNodes := append([]*v1.Node{}, tt.nodes...) allNodes = append(allNodes, tt.failedNodes...) snapshot := nodeinfosnapshot.NewSnapshot(tt.existingPods, allNodes) - got, _ := CalculateEvenPodsSpreadPriority(tt.pod, snapshot, tt.nodes) - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("CalculateEvenPodsSpreadPriority() = %#v, want %#v", got, tt.want) + + meta := &priorityMetadata{ + podTopologySpreadMap: buildPodTopologySpreadMap(tt.pod, tt.nodes, snapshot.NodeInfoList), + } + var gotList framework.NodeScoreList + for _, n := range tt.nodes { + nodeName := n.Name + nodeScore, err := CalculateEvenPodsSpreadPriorityMap(tt.pod, meta, snapshot.NodeInfoMap[nodeName]) + if err != nil { + t.Error(err) + } + gotList = append(gotList, nodeScore) + } + + CalculateEvenPodsSpreadPriorityReduce(tt.pod, meta, snapshot, gotList) + if !reflect.DeepEqual(gotList, tt.want) { + t.Errorf("CalculateEvenPodsSpreadPriorityReduce() = %#v, want %#v", gotList, tt.want) } }) } @@ -484,9 +498,19 @@ func BenchmarkTestCalculateEvenPodsSpreadPriority(b *testing.B) { b.Run(tt.name, func(b *testing.B) { existingPods, allNodes, filteredNodes := st.MakeNodesAndPodsForEvenPodsSpread(tt.pod, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum) snapshot := nodeinfosnapshot.NewSnapshot(existingPods, allNodes) + meta := &priorityMetadata{ + podTopologySpreadMap: buildPodTopologySpreadMap(tt.pod, filteredNodes, snapshot.NodeInfoList), + } b.ResetTimer() + for i := 0; i < b.N; i++ { - CalculateEvenPodsSpreadPriority(tt.pod, snapshot, filteredNodes) + var gotList framework.NodeScoreList + for _, n := range filteredNodes { + nodeName := n.Name + nodeScore, _ := CalculateEvenPodsSpreadPriorityMap(tt.pod, meta, snapshot.NodeInfoMap[nodeName]) + gotList = append(gotList, nodeScore) + } + CalculateEvenPodsSpreadPriorityReduce(tt.pod, meta, snapshot, gotList) } }) } diff --git a/pkg/scheduler/algorithm/priorities/metadata.go b/pkg/scheduler/algorithm/priorities/metadata.go index 19351ea31e6..d33bf46db7a 100644 --- a/pkg/scheduler/algorithm/priorities/metadata.go +++ b/pkg/scheduler/algorithm/priorities/metadata.go @@ -59,18 +59,21 @@ type priorityMetadata struct { controllerRef *metav1.OwnerReference podFirstServiceSelector labels.Selector totalNumNodes int + podTopologySpreadMap *podTopologySpreadMap } // PriorityMetadata is a PriorityMetadataProducer. Node info can be nil. -func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, _ []*v1.Node, sharedLister schedulerlisters.SharedLister) interface{} { +func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, filteredNodes []*v1.Node, sharedLister schedulerlisters.SharedLister) interface{} { // If we cannot compute metadata, just return nil if pod == nil { return nil } totalNumNodes := 0 + var allNodes []*schedulernodeinfo.NodeInfo if sharedLister != nil { if l, err := sharedLister.NodeInfos().List(); err == nil { totalNumNodes = len(l) + allNodes = l } } return &priorityMetadata{ @@ -81,6 +84,7 @@ func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, _ []*v1.Node, controllerRef: metav1.GetControllerOf(pod), podFirstServiceSelector: getFirstServiceSelector(pod, pmf.serviceLister), totalNumNodes: totalNumNodes, + podTopologySpreadMap: buildPodTopologySpreadMap(pod, filteredNodes, allNodes), } } diff --git a/pkg/scheduler/algorithmprovider/defaults/defaults.go b/pkg/scheduler/algorithmprovider/defaults/defaults.go index a4c343abf65..dd189dd18b3 100644 --- a/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -68,7 +68,12 @@ func ApplyFeatureGates() (restore func()) { scheduler.RegisterFitPredicate(predicates.EvenPodsSpreadPred, predicates.EvenPodsSpreadPredicate) // register priority scheduler.InsertPriorityKeyToAlgorithmProviderMap(priorities.EvenPodsSpreadPriority) - scheduler.RegisterPriorityFunction(priorities.EvenPodsSpreadPriority, priorities.CalculateEvenPodsSpreadPriority, 1) + scheduler.RegisterPriorityMapReduceFunction( + priorities.EvenPodsSpreadPriority, + priorities.CalculateEvenPodsSpreadPriorityMap, + priorities.CalculateEvenPodsSpreadPriorityReduce, + 1, + ) } // Prioritizes nodes that satisfy pod's resource limits diff --git a/pkg/scheduler/api/compatibility/compatibility_test.go b/pkg/scheduler/api/compatibility/compatibility_test.go index f699de96390..f3ebc2a1e6b 100644 --- a/pkg/scheduler/api/compatibility/compatibility_test.go +++ b/pkg/scheduler/api/compatibility/compatibility_test.go @@ -1196,15 +1196,15 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { featureGates: map[featuregate.Feature]bool{ features.EvenPodsSpread: true, }, - wantPrioritizers: sets.NewString( - "EvenPodsSpreadPriority", - ), wantPlugins: map[string][]kubeschedulerconfig.Plugin{ "FilterPlugin": { {Name: "NodeUnschedulable"}, {Name: "TaintToleration"}, {Name: "PodTopologySpread"}, }, + "ScorePlugin": { + {Name: "PodTopologySpread", Weight: 2}, + }, }, }, { diff --git a/pkg/scheduler/framework/plugins/default_registry.go b/pkg/scheduler/framework/plugins/default_registry.go index ec2e87ef34a..337a9fc80af 100644 --- a/pkg/scheduler/framework/plugins/default_registry.go +++ b/pkg/scheduler/framework/plugins/default_registry.go @@ -229,25 +229,26 @@ func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry { plugins.Score = appendToPluginSet(plugins.Score, nodepreferavoidpods.Name, &args.Weight) return }) - registry.RegisterPriority(priorities.MostRequestedPriority, func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { plugins.Score = appendToPluginSet(plugins.Score, noderesources.MostAllocatedName, &args.Weight) return }) - registry.RegisterPriority(priorities.BalancedResourceAllocation, func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { plugins.Score = appendToPluginSet(plugins.Score, noderesources.BalancedAllocationName, &args.Weight) return }) - registry.RegisterPriority(priorities.LeastRequestedPriority, func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { plugins.Score = appendToPluginSet(plugins.Score, noderesources.LeastAllocatedName, &args.Weight) return }) - + registry.RegisterPriority(priorities.EvenPodsSpreadPriority, + func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { + plugins.Score = appendToPluginSet(plugins.Score, podtopologyspread.Name, &args.Weight) + return + }) registry.RegisterPriority(requestedtocapacityratio.Name, func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { plugins.Score = appendToPluginSet(plugins.Score, requestedtocapacityratio.Name, &args.Weight) diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/BUILD b/pkg/scheduler/framework/plugins/podtopologyspread/BUILD index 8a4edce1609..9dd7642c2ee 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/BUILD +++ b/pkg/scheduler/framework/plugins/podtopologyspread/BUILD @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/scheduler/algorithm/predicates:go_default_library", + "//pkg/scheduler/algorithm/priorities:go_default_library", "//pkg/scheduler/framework/plugins/migration:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/pod_topology_spread.go b/pkg/scheduler/framework/plugins/podtopologyspread/pod_topology_spread.go index 9a76bbe1732..6a156b5b67d 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/pod_topology_spread.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/pod_topology_spread.go @@ -23,15 +23,19 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" + "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) // PodTopologySpread is a plugin that ensures pod's topologySpreadConstraints is satisfied. -type PodTopologySpread struct{} +type PodTopologySpread struct { + handle framework.FrameworkHandle +} var _ framework.FilterPlugin = &PodTopologySpread{} +var _ framework.ScorePlugin = &PodTopologySpread{} // Name is the name of the plugin used in the plugin registry and configurations. const Name = "PodTopologySpread" @@ -51,7 +55,33 @@ func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState *framework.C return migration.PredicateResultToFrameworkStatus(reasons, err) } -// New initializes a new plugin and returns it. -func New(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { - return &PodTopologySpread{}, nil +// Score invoked at the Score extension point. +// The "score" returned in this function is the matching number of pods on the `nodeName`, +// it is normalized later. +func (pl *PodTopologySpread) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { + nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) + } + + meta := migration.PriorityMetadata(state) + s, err := priorities.CalculateEvenPodsSpreadPriorityMap(pod, meta, nodeInfo) + return s.Score, migration.ErrorToFrameworkStatus(err) +} + +// NormalizeScore invoked after scoring all nodes. +func (pl *PodTopologySpread) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status { + meta := migration.PriorityMetadata(state) + err := priorities.CalculateEvenPodsSpreadPriorityReduce(pod, meta, pl.handle.SnapshotSharedLister(), scores) + return migration.ErrorToFrameworkStatus(err) +} + +// ScoreExtensions of the Score plugin. +func (pl *PodTopologySpread) ScoreExtensions() framework.ScoreExtensions { + return pl +} + +// New initializes a new plugin and returns it. +func New(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) { + return &PodTopologySpread{handle: h}, nil } diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/pod_topology_spread_test.go b/pkg/scheduler/framework/plugins/podtopologyspread/pod_topology_spread_test.go index 6c4663d8f98..282dbb9cea0 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/pod_topology_spread_test.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/pod_topology_spread_test.go @@ -23,14 +23,16 @@ import ( "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" - "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" st "k8s.io/kubernetes/pkg/scheduler/testing" ) -var hardSpread = v1.DoNotSchedule +var ( + hardSpread = v1.DoNotSchedule +) -func TestPodTopologySpreadFilter_SingleConstraint(t *testing.T) { +func TestPodTopologySpread_Filter_SingleConstraint(t *testing.T) { tests := []struct { name string pod *v1.Pod @@ -269,7 +271,7 @@ func TestPodTopologySpreadFilter_SingleConstraint(t *testing.T) { t.Run(tt.name, func(t *testing.T) { snapshot := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes) meta := predicates.GetPredicateMetadata(tt.pod, snapshot) - state := v1alpha1.NewCycleState() + state := framework.NewCycleState() state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) plugin, _ := New(nil, nil) for _, node := range tt.nodes { @@ -283,7 +285,7 @@ func TestPodTopologySpreadFilter_SingleConstraint(t *testing.T) { } } -func TestPodTopologySpreadFilter_MultipleConstraints(t *testing.T) { +func TestPodTopologySpread_Filter_MultipleConstraints(t *testing.T) { tests := []struct { name string pod *v1.Pod @@ -466,7 +468,7 @@ func TestPodTopologySpreadFilter_MultipleConstraints(t *testing.T) { t.Run(tt.name, func(t *testing.T) { snapshot := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes) meta := predicates.GetPredicateMetadata(tt.pod, snapshot) - state := v1alpha1.NewCycleState() + state := framework.NewCycleState() state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) plugin, _ := New(nil, nil) for _, node := range tt.nodes {