Move podtopologyspread priority logic to its Score plugin

This commit is contained in:
Wei Huang 2019-12-27 20:20:35 -08:00
parent 33bba19372
commit 78352240f6
No known key found for this signature in database
GPG Key ID: BE5E9752F8B6E005
12 changed files with 446 additions and 451 deletions

View File

@ -10,7 +10,6 @@ go_library(
name = "go_default_library",
srcs = [
"balanced_resource_allocation.go",
"even_pods_spread.go",
"least_requested.go",
"metadata.go",
"most_requested.go",
@ -26,7 +25,6 @@ go_library(
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/features:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/listers:go_default_library",
@ -39,7 +37,6 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
@ -48,7 +45,6 @@ go_test(
name = "go_default_test",
srcs = [
"balanced_resource_allocation_test.go",
"even_pods_spread_test.go",
"least_requested_test.go",
"metadata_test.go",
"most_requested_test.go",

View File

@ -1,267 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package priorities
import (
"context"
"fmt"
"math"
"sync/atomic"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/klog"
)
type topologyPair struct {
key string
value string
}
type podTopologySpreadMap struct {
constraints []topologySpreadConstraint
// nodeNameSet is a string set holding all node names which have all constraints[*].topologyKey present.
nodeNameSet map[string]struct{}
// topologyPairToPodCounts is keyed with topologyPair, and valued with the number of matching pods.
topologyPairToPodCounts map[topologyPair]*int64
}
// topologySpreadConstraint is an internal version for a soft (ScheduleAnyway
// unsatisfiable constraint action) v1.TopologySpreadConstraint and where the
// selector is parsed.
type topologySpreadConstraint struct {
topologyKey string
selector labels.Selector
}
func newTopologySpreadConstraintsMap() *podTopologySpreadMap {
return &podTopologySpreadMap{
nodeNameSet: make(map[string]struct{}),
topologyPairToPodCounts: make(map[topologyPair]*int64),
}
}
// buildPodTopologySpreadMap prepares necessary data (podTopologySpreadMap) for incoming pod on the filteredNodes.
// Later Priority function will use 'podTopologySpreadMap' to perform the Scoring calculations.
func buildPodTopologySpreadMap(pod *v1.Pod, filteredNodes []*v1.Node, allNodes []*schedulernodeinfo.NodeInfo) (*podTopologySpreadMap, error) {
if len(filteredNodes) == 0 || len(allNodes) == 0 {
return nil, nil
}
// initialize podTopologySpreadMap which will be used in Score plugin.
m := newTopologySpreadConstraintsMap()
err := m.initialize(pod, filteredNodes)
if err != nil {
return nil, err
}
// return if incoming pod doesn't have soft topology spread constraints.
if m.constraints == nil {
return nil, nil
}
processAllNode := func(i int) {
nodeInfo := allNodes[i]
node := nodeInfo.Node()
if node == nil {
return
}
// (1) `node` should satisfy incoming pod's NodeSelector/NodeAffinity
// (2) All topologyKeys need to be present in `node`
if !predicates.PodMatchesNodeSelectorAndAffinityTerms(pod, node) ||
!nodeLabelsMatchSpreadConstraints(node.Labels, m.constraints) {
return
}
for _, c := range m.constraints {
pair := topologyPair{key: c.topologyKey, value: node.Labels[c.topologyKey]}
// If current topology pair is not associated with any candidate node,
// continue to avoid unnecessary calculation.
if m.topologyPairToPodCounts[pair] == nil {
continue
}
// <matchSum> indicates how many pods (on current node) match the <constraint>.
matchSum := int64(0)
for _, existingPod := range nodeInfo.Pods() {
if c.selector.Matches(labels.Set(existingPod.Labels)) {
matchSum++
}
}
atomic.AddInt64(m.topologyPairToPodCounts[pair], matchSum)
}
}
workqueue.ParallelizeUntil(context.Background(), 16, len(allNodes), processAllNode)
return m, nil
}
// initialize iterates "filteredNodes" to filter out the nodes which don't have required topologyKey(s),
// and initialize two maps:
// 1) m.topologyPairToPodCounts: keyed with both eligible topology pair and node names.
// 2) m.nodeNameSet: keyed with node name, and valued with a *int64 pointer for eligible node only.
func (m *podTopologySpreadMap) initialize(pod *v1.Pod, filteredNodes []*v1.Node) error {
constraints, err := filterSoftTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints)
if err != nil {
return err
}
if constraints == nil {
return nil
}
m.constraints = constraints
for _, node := range filteredNodes {
if !nodeLabelsMatchSpreadConstraints(node.Labels, m.constraints) {
continue
}
for _, constraint := range m.constraints {
pair := topologyPair{key: constraint.topologyKey, value: node.Labels[constraint.topologyKey]}
if m.topologyPairToPodCounts[pair] == nil {
m.topologyPairToPodCounts[pair] = new(int64)
}
}
m.nodeNameSet[node.Name] = struct{}{}
// For those nodes which don't have all required topologyKeys present, it's intentional to leave
// their entries absent in nodeNameSet, so that we're able to score them to 0 afterwards.
}
return nil
}
// CalculateEvenPodsSpreadPriorityMap calculate the number of matching pods on the passed-in "node",
// and return the number as Score.
func CalculateEvenPodsSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
node := nodeInfo.Node()
if node == nil {
return framework.NodeScore{}, fmt.Errorf("node not found")
}
var m *podTopologySpreadMap
if priorityMeta, ok := meta.(*priorityMetadata); ok {
m = priorityMeta.podTopologySpreadMap
}
if m == nil {
return framework.NodeScore{Name: node.Name, Score: 0}, nil
}
// no need to continue if the node is not qualified.
if _, ok := m.nodeNameSet[node.Name]; !ok {
return framework.NodeScore{Name: node.Name, Score: 0}, nil
}
// For each present <pair>, current node gets a credit of <matchSum>.
// And we sum up <matchSum> and return it as this node's score.
var score int64
for _, c := range m.constraints {
if tpVal, ok := node.Labels[c.topologyKey]; ok {
pair := topologyPair{key: c.topologyKey, value: tpVal}
matchSum := *m.topologyPairToPodCounts[pair]
score += matchSum
}
}
return framework.NodeScore{Name: node.Name, Score: score}, nil
}
// CalculateEvenPodsSpreadPriorityReduce normalizes the score for each filteredNode,
// The basic rule is: the bigger the score(matching number of pods) is, the smaller the
// final normalized score will be.
func CalculateEvenPodsSpreadPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister,
result framework.NodeScoreList) error {
var m *podTopologySpreadMap
if priorityMeta, ok := meta.(*priorityMetadata); ok {
m = priorityMeta.podTopologySpreadMap
}
if m == nil {
return nil
}
// Calculate the summed <total> score and <minScore>.
var minScore int64 = math.MaxInt64
var total int64
for _, score := range result {
// it's mandatory to check if <score.Name> is present in m.nodeNameSet
if _, ok := m.nodeNameSet[score.Name]; !ok {
continue
}
total += score.Score
if score.Score < minScore {
minScore = score.Score
}
}
maxMinDiff := total - minScore
for i := range result {
nodeInfo, err := sharedLister.NodeInfos().Get(result[i].Name)
if err != nil {
return err
}
node := nodeInfo.Node()
// Debugging purpose: print the score for each node.
// Score must be a pointer here, otherwise it's always 0.
if klog.V(10) {
defer func(score *int64, nodeName string) {
klog.Infof("%v -> %v: PodTopologySpread NormalizeScore, Score: (%d)", pod.Name, nodeName, *score)
}(&result[i].Score, node.Name)
}
if maxMinDiff == 0 {
result[i].Score = framework.MaxNodeScore
continue
}
if _, ok := m.nodeNameSet[node.Name]; !ok {
result[i].Score = 0
continue
}
flippedScore := total - result[i].Score
fScore := float64(framework.MaxNodeScore) * (float64(flippedScore) / float64(maxMinDiff))
result[i].Score = int64(fScore)
}
return nil
}
func filterSoftTopologySpreadConstraints(constraints []v1.TopologySpreadConstraint) ([]topologySpreadConstraint, error) {
var r []topologySpreadConstraint
for _, c := range constraints {
if c.WhenUnsatisfiable == v1.ScheduleAnyway {
selector, err := metav1.LabelSelectorAsSelector(c.LabelSelector)
if err != nil {
return nil, err
}
r = append(r, topologySpreadConstraint{
topologyKey: c.TopologyKey,
selector: selector,
})
}
}
return r, nil
}
// nodeLabelsMatchSpreadConstraints checks if ALL topology keys in spread constraints are present in node labels.
func nodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints []topologySpreadConstraint) bool {
for _, c := range constraints {
if _, ok := nodeLabels[c.topologyKey]; !ok {
return false
}
}
return true
}

View File

@ -22,9 +22,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
appslisters "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/klog"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
// MetadataFactory is a factory to produce PriorityMetadata.
@ -57,7 +55,6 @@ func NewMetadataFactory(
// priorityMetadata is a type that is passed as metadata for priority functions
type priorityMetadata struct {
podSelector labels.Selector
podTopologySpreadMap *podTopologySpreadMap
}
// PriorityMetadata is a MetadataProducer. Node info can be nil.
@ -70,20 +67,8 @@ func (pmf *MetadataFactory) PriorityMetadata(
if pod == nil {
return nil
}
var allNodes []*schedulernodeinfo.NodeInfo
if sharedLister != nil {
if l, err := sharedLister.NodeInfos().List(); err == nil {
allNodes = l
}
}
tpSpreadMap, err := buildPodTopologySpreadMap(pod, filteredNodes, allNodes)
if err != nil {
klog.Errorf("Error building podTopologySpreadMap: %v", err)
return nil
}
return &priorityMetadata{
podSelector: getSelector(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister),
podTopologySpreadMap: tpSpreadMap,
}
}

View File

@ -20,7 +20,6 @@ import (
"testing"
v1 "k8s.io/api/core/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/listers/fake"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
st "k8s.io/kubernetes/pkg/scheduler/testing"
@ -48,41 +47,6 @@ var (
}
)
func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) {
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
pod := st.MakePod().Name("p").Label("foo", "").
SpreadConstraint(1, v1.LabelHostname, softSpread, st.MakeLabelSelector().Exists("foo").Obj()).
SpreadConstraint(1, v1.LabelZoneFailureDomain, softSpread, st.MakeLabelSelector().Exists("foo").Obj()).Obj()
existingPods, allNodes, filteredNodes := st.MakeNodesAndPodsForEvenPodsSpread(pod.Labels, tt.existingPodsNum, tt.allNodesNum, tt.allNodesNum)
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(existingPods, allNodes))
b.ResetTimer()
for i := 0; i < b.N; i++ {
tpSpreadMap, err := buildPodTopologySpreadMap(pod, filteredNodes, snapshot.NodeInfoList)
if err != nil {
b.Fatal(err)
}
meta := &priorityMetadata{
podTopologySpreadMap: tpSpreadMap,
}
var gotList framework.NodeScoreList
for _, n := range filteredNodes {
score, err := CalculateEvenPodsSpreadPriorityMap(pod, meta, snapshot.NodeInfoMap[n.Name])
if err != nil {
b.Fatal(err)
}
gotList = append(gotList, score)
}
err = CalculateEvenPodsSpreadPriorityReduce(pod, meta, snapshot, gotList)
if err != nil {
b.Fatal(err)
}
}
})
}
}
func BenchmarkTestSelectorSpreadPriority(b *testing.B) {
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {

View File

@ -1272,6 +1272,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "TaintToleration"},
{Name: "PodTopologySpread"},
},
"PostFilterPlugin": {
{Name: "PodTopologySpread"},
},
"ScorePlugin": {
{Name: "PodTopologySpread", Weight: 2},
},

View File

@ -3,14 +3,15 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"common.go",
"filtering.go",
"plugin.go",
"scoring.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/listers:go_default_library",
@ -19,6 +20,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
@ -26,7 +28,10 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["filtering_test.go"],
srcs = [
"filtering_test.go",
"scoring_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/framework/v1alpha1:go_default_library",
@ -35,6 +40,7 @@ go_test(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
],
)

View File

@ -0,0 +1,64 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podtopologyspread
import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)
type topologyPair struct {
key string
value string
}
// topologySpreadConstraint is an internal version for v1.TopologySpreadConstraint
// and where the selector is parsed.
type topologySpreadConstraint struct {
maxSkew int32
topologyKey string
selector labels.Selector
}
// nodeLabelsMatchSpreadConstraints checks if ALL topology keys in spread constraints are present in node labels.
func nodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints []topologySpreadConstraint) bool {
for _, c := range constraints {
if _, ok := nodeLabels[c.topologyKey]; !ok {
return false
}
}
return true
}
func filterTopologySpreadConstraints(constraints []v1.TopologySpreadConstraint, action v1.UnsatisfiableConstraintAction) ([]topologySpreadConstraint, error) {
var result []topologySpreadConstraint
for _, c := range constraints {
if c.WhenUnsatisfiable == action {
selector, err := metav1.LabelSelectorAsSelector(c.LabelSelector)
if err != nil {
return nil, err
}
result = append(result, topologySpreadConstraint{
maxSkew: c.MaxSkew,
topologyKey: c.TopologyKey,
selector: selector,
})
}
}
return result, nil
}

View File

@ -23,7 +23,6 @@ import (
"sync"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
@ -124,20 +123,6 @@ func (paths *criticalPaths) update(tpVal string, num int32) {
}
}
type topologyPair struct {
key string
value string
}
// topologySpreadConstraint is an internal version for a hard (DoNotSchedule
// unsatisfiable constraint action) v1.TopologySpreadConstraint and where the
// selector is parsed.
type topologySpreadConstraint struct {
maxSkew int32
topologyKey string
selector labels.Selector
}
func (s *preFilterState) updateWithPod(updatedPod, preemptorPod *v1.Pod, node *v1.Node, delta int32) {
if s == nil || updatedPod.Namespace != preemptorPod.Namespace || node == nil {
return
@ -160,16 +145,6 @@ func (s *preFilterState) updateWithPod(updatedPod, preemptorPod *v1.Pod, node *v
}
}
// nodeLabelsMatchSpreadConstraints checks if ALL topology keys in spread constraints are present in node labels.
func nodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints []topologySpreadConstraint) bool {
for _, c := range constraints {
if _, ok := nodeLabels[c.topologyKey]; !ok {
return false
}
}
return true
}
// PreFilter invoked at the prefilter extension point.
func (pl *PodTopologySpread) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
var s *preFilterState
@ -216,11 +191,11 @@ func (pl *PodTopologySpread) RemovePod(ctx context.Context, cycleState *framewor
return nil
}
// getPreFilterState fetches a pre-computed preFilterState
// getPreFilterState fetches a pre-computed preFilterState.
func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {
c, err := cycleState.Read(preFilterStateKey)
if err != nil {
// The metadata wasn't pre-computed in prefilter. We ignore the error for now since
// The preFilterState wasn't pre-computed in prefilter. We ignore the error for now since
// we are able to handle that by computing it again (e.g. in Filter()).
klog.V(5).Infof("Error reading %q from cycleState: %v", preFilterStateKey, err)
return nil, nil
@ -228,7 +203,7 @@ func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error
s, ok := c.(*preFilterState)
if !ok {
return nil, fmt.Errorf("%+v convert to podtopologyspread.state error", c)
return nil, fmt.Errorf("%+v convert to podtopologyspread.preFilterState error", c)
}
return s, nil
}
@ -237,7 +212,7 @@ func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error
func calPreFilterState(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (*preFilterState, error) {
// We have feature gating in APIServer to strip the spec
// so don't need to re-check feature gate, just check length of constraints.
constraints, err := filterHardTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints)
constraints, err := filterTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints, v1.DoNotSchedule)
if err != nil {
return nil, err
}
@ -306,24 +281,6 @@ func calPreFilterState(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (*pr
return &s, nil
}
func filterHardTopologySpreadConstraints(constraints []v1.TopologySpreadConstraint) ([]topologySpreadConstraint, error) {
var result []topologySpreadConstraint
for _, c := range constraints {
if c.WhenUnsatisfiable == v1.DoNotSchedule {
selector, err := metav1.LabelSelectorAsSelector(c.LabelSelector)
if err != nil {
return nil, err
}
result = append(result, topologySpreadConstraint{
maxSkew: c.MaxSkew,
topologyKey: c.TopologyKey,
selector: selector,
})
}
}
return result, nil
}
// Filter invoked at the filter extension point.
func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
node := nodeInfo.Node()
@ -341,7 +298,7 @@ func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState *framework.C
return framework.NewStatus(framework.Error, "preFilterState not pre-computed for PodTopologySpread Plugin")
}
// However, "empty" meta is legit which tolerates every toSchedule Pod.
// However, "empty" preFilterState is legit which tolerates every toSchedule Pod.
if len(s.tpPairToMatchNum) == 0 || len(s.constraints) == 0 {
return nil
}

View File

@ -17,13 +17,9 @@ limitations under the License.
package podtopologyspread
import (
"context"
"fmt"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
)
@ -35,6 +31,7 @@ type PodTopologySpread struct {
var _ framework.PreFilterPlugin = &PodTopologySpread{}
var _ framework.FilterPlugin = &PodTopologySpread{}
var _ framework.PostFilterPlugin = &PodTopologySpread{}
var _ framework.ScorePlugin = &PodTopologySpread{}
const (
@ -47,32 +44,6 @@ func (pl *PodTopologySpread) Name() string {
return Name
}
// Score invoked at the Score extension point.
// The "score" returned in this function is the matching number of pods on the `nodeName`,
// it is normalized later.
func (pl *PodTopologySpread) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
}
meta := migration.PriorityMetadata(state)
s, err := priorities.CalculateEvenPodsSpreadPriorityMap(pod, meta, nodeInfo)
return s.Score, migration.ErrorToFrameworkStatus(err)
}
// NormalizeScore invoked after scoring all nodes.
func (pl *PodTopologySpread) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
meta := migration.PriorityMetadata(state)
err := priorities.CalculateEvenPodsSpreadPriorityReduce(pod, meta, pl.sharedLister, scores)
return migration.ErrorToFrameworkStatus(err)
}
// ScoreExtensions of the Score plugin.
func (pl *PodTopologySpread) ScoreExtensions() framework.ScoreExtensions {
return pl
}
// New initializes a new plugin and returns it.
func New(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {
if h.SnapshotSharedLister() == nil {

View File

@ -0,0 +1,256 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podtopologyspread
import (
"context"
"fmt"
"math"
"sync/atomic"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
const postFilterStateKey = "PostFilter" + Name
// postFilterState computed at PostFilter and used at Score.
type postFilterState struct {
constraints []topologySpreadConstraint
// nodeNameSet is a string set holding all node names which have all constraints[*].topologyKey present.
nodeNameSet sets.String
// topologyPairToPodCounts is keyed with topologyPair, and valued with the number of matching pods.
topologyPairToPodCounts map[topologyPair]*int64
}
// Clone implements the mandatory Clone interface. We don't really copy the data since
// there is no need for that.
func (s *postFilterState) Clone() framework.StateData {
return s
}
// initialize iterates "filteredNodes" to filter out the nodes which don't have required topologyKey(s),
// and initialize two maps:
// 1) s.topologyPairToPodCounts: keyed with both eligible topology pair and node names.
// 2) s.nodeNameSet: keyed with node name, and valued with a *int64 pointer for eligible node only.
func (s *postFilterState) initialize(pod *v1.Pod, filteredNodes []*v1.Node) error {
constraints, err := filterTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints, v1.ScheduleAnyway)
if err != nil {
return err
}
if constraints == nil {
return nil
}
s.constraints = constraints
for _, node := range filteredNodes {
if !nodeLabelsMatchSpreadConstraints(node.Labels, s.constraints) {
continue
}
for _, constraint := range s.constraints {
pair := topologyPair{key: constraint.topologyKey, value: node.Labels[constraint.topologyKey]}
if s.topologyPairToPodCounts[pair] == nil {
s.topologyPairToPodCounts[pair] = new(int64)
}
}
s.nodeNameSet.Insert(node.Name)
// For those nodes which don't have all required topologyKeys present, it's intentional to leave
// their entries absent in nodeNameSet, so that we're able to score them to 0 afterwards.
}
return nil
}
// PostFilter builds and writes cycle state used by Score and NormalizeScore.
func (pl *PodTopologySpread) PostFilter(
ctx context.Context,
cycleState *framework.CycleState,
pod *v1.Pod,
filteredNodes []*v1.Node,
_ framework.NodeToStatusMap,
) *framework.Status {
allNodes, err := pl.sharedLister.NodeInfos().List()
if err != nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("error when getting all nodes: %v", err))
}
if len(filteredNodes) == 0 || len(allNodes) == 0 {
// No nodes to score.
return nil
}
state := &postFilterState{
nodeNameSet: sets.String{},
topologyPairToPodCounts: make(map[topologyPair]*int64),
}
err = state.initialize(pod, filteredNodes)
if err != nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("error when calculating postFilterState: %v", err))
}
// return if incoming pod doesn't have soft topology spread constraints.
if state.constraints == nil {
cycleState.Write(postFilterStateKey, state)
return nil
}
processAllNode := func(i int) {
nodeInfo := allNodes[i]
node := nodeInfo.Node()
if node == nil {
return
}
// (1) `node` should satisfy incoming pod's NodeSelector/NodeAffinity
// (2) All topologyKeys need to be present in `node`
if !predicates.PodMatchesNodeSelectorAndAffinityTerms(pod, node) ||
!nodeLabelsMatchSpreadConstraints(node.Labels, state.constraints) {
return
}
for _, c := range state.constraints {
pair := topologyPair{key: c.topologyKey, value: node.Labels[c.topologyKey]}
// If current topology pair is not associated with any candidate node,
// continue to avoid unnecessary calculation.
if state.topologyPairToPodCounts[pair] == nil {
continue
}
// <matchSum> indicates how many pods (on current node) match the <constraint>.
matchSum := int64(0)
for _, existingPod := range nodeInfo.Pods() {
if c.selector.Matches(labels.Set(existingPod.Labels)) {
matchSum++
}
}
atomic.AddInt64(state.topologyPairToPodCounts[pair], matchSum)
}
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processAllNode)
cycleState.Write(postFilterStateKey, state)
return nil
}
// Score invoked at the Score extension point.
// The "score" returned in this function is the matching number of pods on the `nodeName`,
// it is normalized later.
func (pl *PodTopologySpread) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)
if err != nil || nodeInfo.Node() == nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node() == nil))
}
node := nodeInfo.Node()
s, err := getPostFilterState(cycleState)
if err != nil {
return 0, framework.NewStatus(framework.Error, err.Error())
}
// Return if the node is not qualified.
if _, ok := s.nodeNameSet[node.Name]; !ok {
return 0, nil
}
// For each present <pair>, current node gets a credit of <matchSum>.
// And we sum up <matchSum> and return it as this node's score.
var score int64
for _, c := range s.constraints {
if tpVal, ok := node.Labels[c.topologyKey]; ok {
pair := topologyPair{key: c.topologyKey, value: tpVal}
matchSum := *s.topologyPairToPodCounts[pair]
score += matchSum
}
}
return score, nil
}
// NormalizeScore invoked after scoring all nodes.
func (pl *PodTopologySpread) NormalizeScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
s, err := getPostFilterState(cycleState)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
if s == nil {
return nil
}
// Calculate the summed <total> score and <minScore>.
var minScore int64 = math.MaxInt64
var total int64
for _, score := range scores {
// it's mandatory to check if <score.Name> is present in m.nodeNameSet
if _, ok := s.nodeNameSet[score.Name]; !ok {
continue
}
total += score.Score
if score.Score < minScore {
minScore = score.Score
}
}
maxMinDiff := total - minScore
for i := range scores {
nodeInfo, err := pl.sharedLister.NodeInfos().Get(scores[i].Name)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
node := nodeInfo.Node()
// Debugging purpose: print the score for each node.
// Score must be a pointer here, otherwise it's always 0.
if klog.V(10) {
defer func(score *int64, nodeName string) {
klog.Infof("%v -> %v: PodTopologySpread NormalizeScore, Score: (%d)", pod.Name, nodeName, *score)
}(&scores[i].Score, node.Name)
}
if maxMinDiff == 0 {
scores[i].Score = framework.MaxNodeScore
continue
}
if _, ok := s.nodeNameSet[node.Name]; !ok {
scores[i].Score = 0
continue
}
flippedScore := total - scores[i].Score
fScore := float64(framework.MaxNodeScore) * (float64(flippedScore) / float64(maxMinDiff))
scores[i].Score = int64(fScore)
}
return nil
}
// ScoreExtensions of the Score plugin.
func (pl *PodTopologySpread) ScoreExtensions() framework.ScoreExtensions {
return pl
}
func getPostFilterState(cycleState *framework.CycleState) (*postFilterState, error) {
c, err := cycleState.Read(postFilterStateKey)
if err != nil {
return nil, fmt.Errorf("error reading %q from cycleState: %v", postFilterStateKey, err)
}
s, ok := c.(*postFilterState)
if !ok {
return nil, fmt.Errorf("%+v convert to podtopologyspread.postFilterState error", c)
}
return s, nil
}

View File

@ -14,24 +14,26 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package priorities
package podtopologyspread
import (
"context"
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
func Test_podTopologySpreadMap_initialize(t *testing.T) {
func TestPostFilterStateInitialize(t *testing.T) {
tests := []struct {
name string
pod *v1.Pod
nodes []*v1.Node
wantNodeNameSet map[string]struct{}
wantNodeNameSet sets.String
wantTopologyPairMap map[topologyPair]*int64
}{
{
@ -45,11 +47,7 @@ func Test_podTopologySpreadMap_initialize(t *testing.T) {
st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(),
st.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(),
},
wantNodeNameSet: map[string]struct{}{
"node-a": {},
"node-b": {},
"node-x": {},
},
wantNodeNameSet: sets.NewString("node-a", "node-b", "node-x"),
wantTopologyPairMap: map[topologyPair]*int64{
{key: "zone", value: "zone1"}: new(int64),
{key: "zone", value: "zone2"}: new(int64),
@ -69,10 +67,7 @@ func Test_podTopologySpreadMap_initialize(t *testing.T) {
st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(),
st.MakeNode().Name("node-x").Label("node", "node-x").Obj(),
},
wantNodeNameSet: map[string]struct{}{
"node-a": {},
"node-b": {},
},
wantNodeNameSet: sets.NewString("node-a", "node-b"),
wantTopologyPairMap: map[topologyPair]*int64{
{key: "zone", value: "zone1"}: new(int64),
{key: "node", value: "node-a"}: new(int64),
@ -82,21 +77,24 @@ func Test_podTopologySpreadMap_initialize(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := newTopologySpreadConstraintsMap()
if err := m.initialize(tt.pod, tt.nodes); err != nil {
s := &postFilterState{
nodeNameSet: sets.String{},
topologyPairToPodCounts: make(map[topologyPair]*int64),
}
if err := s.initialize(tt.pod, tt.nodes); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(m.nodeNameSet, tt.wantNodeNameSet) {
t.Errorf("initilize().nodeNameSet = %#v, want %#v", m.nodeNameSet, tt.wantNodeNameSet)
if !reflect.DeepEqual(s.nodeNameSet, tt.wantNodeNameSet) {
t.Errorf("initilize().nodeNameSet = %#v, want %#v", s.nodeNameSet, tt.wantNodeNameSet)
}
if !reflect.DeepEqual(m.topologyPairToPodCounts, tt.wantTopologyPairMap) {
t.Errorf("initilize().topologyPairToPodCounts = %#v, want %#v", m.topologyPairToPodCounts, tt.wantTopologyPairMap)
if !reflect.DeepEqual(s.topologyPairToPodCounts, tt.wantTopologyPairMap) {
t.Errorf("initilize().topologyPairToPodCounts = %#v, want %#v", s.topologyPairToPodCounts, tt.wantTopologyPairMap)
}
})
}
}
func TestCalculateEvenPodsSpreadPriority(t *testing.T) {
func TestPodTopologySpreadScore(t *testing.T) {
tests := []struct {
name string
pod *v1.Pod
@ -436,37 +434,37 @@ func TestCalculateEvenPodsSpreadPriority(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
allNodes := append([]*v1.Node{}, tt.nodes...)
allNodes = append(allNodes, tt.failedNodes...)
state := framework.NewCycleState()
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(tt.existingPods, allNodes))
p := &PodTopologySpread{sharedLister: snapshot}
tpSpreadMap, err := buildPodTopologySpreadMap(tt.pod, tt.nodes, snapshot.NodeInfoList)
if err != nil {
t.Fatal(err)
}
meta := &priorityMetadata{
podTopologySpreadMap: tpSpreadMap,
status := p.PostFilter(context.Background(), state, tt.pod, tt.nodes, nil)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
var gotList framework.NodeScoreList
for _, n := range tt.nodes {
nodeName := n.Name
nodeScore, err := CalculateEvenPodsSpreadPriorityMap(tt.pod, meta, snapshot.NodeInfoMap[nodeName])
if err != nil {
t.Error(err)
score, status := p.Score(context.Background(), state, tt.pod, nodeName)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
gotList = append(gotList, nodeScore)
gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score})
}
err = CalculateEvenPodsSpreadPriorityReduce(tt.pod, meta, snapshot, gotList)
if err != nil {
t.Fatal(err)
status = p.NormalizeScore(context.Background(), state, tt.pod, gotList)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
if !reflect.DeepEqual(gotList, tt.want) {
t.Errorf("CalculateEvenPodsSpreadPriorityReduce() = %#v, want %#v", gotList, tt.want)
if !reflect.DeepEqual(tt.want, gotList) {
t.Errorf("expected:\n\t%+v,\ngot:\n\t%+v", tt.want, gotList)
}
})
}
}
func BenchmarkTestCalculateEvenPodsSpreadPriority(b *testing.B) {
func BenchmarkTestPodTopologySpreadScore(b *testing.B) {
tests := []struct {
name string
pod *v1.Pod
@ -506,13 +504,13 @@ func BenchmarkTestCalculateEvenPodsSpreadPriority(b *testing.B) {
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
existingPods, allNodes, filteredNodes := st.MakeNodesAndPodsForEvenPodsSpread(tt.pod.Labels, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum)
state := framework.NewCycleState()
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(existingPods, allNodes))
tpSpreadMap, err := buildPodTopologySpreadMap(tt.pod, filteredNodes, snapshot.NodeInfoList)
if err != nil {
b.Fatal(err)
}
meta := &priorityMetadata{
podTopologySpreadMap: tpSpreadMap,
p := &PodTopologySpread{sharedLister: snapshot}
status := p.PostFilter(context.Background(), state, tt.pod, filteredNodes, nil)
if !status.IsSuccess() {
b.Fatalf("unexpected error: %v", status)
}
b.ResetTimer()
@ -520,16 +518,77 @@ func BenchmarkTestCalculateEvenPodsSpreadPriority(b *testing.B) {
var gotList framework.NodeScoreList
for _, n := range filteredNodes {
nodeName := n.Name
nodeScore, _ := CalculateEvenPodsSpreadPriorityMap(tt.pod, meta, snapshot.NodeInfoMap[nodeName])
gotList = append(gotList, nodeScore)
score, status := p.Score(context.Background(), state, tt.pod, nodeName)
if !status.IsSuccess() {
b.Fatalf("unexpected error: %v", status)
}
err := CalculateEvenPodsSpreadPriorityReduce(tt.pod, meta, snapshot, gotList)
if err != nil {
b.Fatal(err)
gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score})
}
status = p.NormalizeScore(context.Background(), state, tt.pod, gotList)
if !status.IsSuccess() {
b.Fatal(status)
}
}
})
}
}
var softSpread = v1.ScheduleAnyway
// The tests in this file compare the performance of SelectorSpreadPriority
// against EvenPodsSpreadPriority with a similar rule.
var (
tests = []struct {
name string
existingPodsNum int
allNodesNum int
}{
{
name: "100nodes",
existingPodsNum: 1000,
allNodesNum: 100,
},
{
name: "1000nodes",
existingPodsNum: 10000,
allNodesNum: 1000,
},
}
)
func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) {
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
pod := st.MakePod().Name("p").Label("foo", "").
SpreadConstraint(1, v1.LabelHostname, softSpread, st.MakeLabelSelector().Exists("foo").Obj()).
SpreadConstraint(1, v1.LabelZoneFailureDomain, softSpread, st.MakeLabelSelector().Exists("foo").Obj()).Obj()
existingPods, allNodes, filteredNodes := st.MakeNodesAndPodsForEvenPodsSpread(pod.Labels, tt.existingPodsNum, tt.allNodesNum, tt.allNodesNum)
state := framework.NewCycleState()
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(existingPods, allNodes))
p := &PodTopologySpread{sharedLister: snapshot}
status := p.PostFilter(context.Background(), state, pod, filteredNodes, nil)
if !status.IsSuccess() {
b.Fatalf("unexpected error: %v", status)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
var gotList framework.NodeScoreList
for _, n := range filteredNodes {
nodeName := n.Name
score, status := p.Score(context.Background(), state, pod, nodeName)
if !status.IsSuccess() {
b.Fatalf("unexpected error: %v", status)
}
gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score})
}
status = p.NormalizeScore(context.Background(), state, pod, gotList)
if !status.IsSuccess() {
b.Fatal(status)
}
}
})
}
}

View File

@ -284,6 +284,7 @@ func NewConfigProducerRegistry() *ConfigProducerRegistry {
})
registry.RegisterPriority(priorities.EvenPodsSpreadPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PostFilter = appendToPluginSet(plugins.PostFilter, podtopologyspread.Name, nil)
plugins.Score = appendToPluginSet(plugins.Score, podtopologyspread.Name, &args.Weight)
return
})