diff --git a/pkg/scheduler/algorithm/priorities/BUILD b/pkg/scheduler/algorithm/priorities/BUILD index 98ef0d0064b..6111defbafb 100644 --- a/pkg/scheduler/algorithm/priorities/BUILD +++ b/pkg/scheduler/algorithm/priorities/BUILD @@ -10,7 +10,6 @@ go_library( name = "go_default_library", srcs = [ "balanced_resource_allocation.go", - "even_pods_spread.go", "least_requested.go", "metadata.go", "most_requested.go", @@ -26,7 +25,6 @@ go_library( deps = [ "//pkg/apis/core/v1/helper:go_default_library", "//pkg/features:go_default_library", - "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/algorithm/priorities/util:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/listers:go_default_library", @@ -39,7 +37,6 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", - "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) @@ -48,7 +45,6 @@ go_test( name = "go_default_test", srcs = [ "balanced_resource_allocation_test.go", - "even_pods_spread_test.go", "least_requested_test.go", "metadata_test.go", "most_requested_test.go", diff --git a/pkg/scheduler/algorithm/priorities/even_pods_spread.go b/pkg/scheduler/algorithm/priorities/even_pods_spread.go deleted file mode 100644 index 5ef8e6d4bfc..00000000000 --- a/pkg/scheduler/algorithm/priorities/even_pods_spread.go +++ /dev/null @@ -1,267 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package priorities - -import ( - "context" - "fmt" - "math" - "sync/atomic" - - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/util/workqueue" - "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" - framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" - schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" - schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" - - "k8s.io/klog" -) - -type topologyPair struct { - key string - value string -} - -type podTopologySpreadMap struct { - constraints []topologySpreadConstraint - // nodeNameSet is a string set holding all node names which have all constraints[*].topologyKey present. - nodeNameSet map[string]struct{} - // topologyPairToPodCounts is keyed with topologyPair, and valued with the number of matching pods. - topologyPairToPodCounts map[topologyPair]*int64 -} - -// topologySpreadConstraint is an internal version for a soft (ScheduleAnyway -// unsatisfiable constraint action) v1.TopologySpreadConstraint and where the -// selector is parsed. -type topologySpreadConstraint struct { - topologyKey string - selector labels.Selector -} - -func newTopologySpreadConstraintsMap() *podTopologySpreadMap { - return &podTopologySpreadMap{ - nodeNameSet: make(map[string]struct{}), - topologyPairToPodCounts: make(map[topologyPair]*int64), - } -} - -// buildPodTopologySpreadMap prepares necessary data (podTopologySpreadMap) for incoming pod on the filteredNodes. -// Later Priority function will use 'podTopologySpreadMap' to perform the Scoring calculations. -func buildPodTopologySpreadMap(pod *v1.Pod, filteredNodes []*v1.Node, allNodes []*schedulernodeinfo.NodeInfo) (*podTopologySpreadMap, error) { - if len(filteredNodes) == 0 || len(allNodes) == 0 { - return nil, nil - } - - // initialize podTopologySpreadMap which will be used in Score plugin. - m := newTopologySpreadConstraintsMap() - err := m.initialize(pod, filteredNodes) - if err != nil { - return nil, err - } - // return if incoming pod doesn't have soft topology spread constraints. - if m.constraints == nil { - return nil, nil - } - - processAllNode := func(i int) { - nodeInfo := allNodes[i] - node := nodeInfo.Node() - if node == nil { - return - } - // (1) `node` should satisfy incoming pod's NodeSelector/NodeAffinity - // (2) All topologyKeys need to be present in `node` - if !predicates.PodMatchesNodeSelectorAndAffinityTerms(pod, node) || - !nodeLabelsMatchSpreadConstraints(node.Labels, m.constraints) { - return - } - - for _, c := range m.constraints { - pair := topologyPair{key: c.topologyKey, value: node.Labels[c.topologyKey]} - // If current topology pair is not associated with any candidate node, - // continue to avoid unnecessary calculation. - if m.topologyPairToPodCounts[pair] == nil { - continue - } - - // indicates how many pods (on current node) match the . - matchSum := int64(0) - for _, existingPod := range nodeInfo.Pods() { - if c.selector.Matches(labels.Set(existingPod.Labels)) { - matchSum++ - } - } - atomic.AddInt64(m.topologyPairToPodCounts[pair], matchSum) - } - } - workqueue.ParallelizeUntil(context.Background(), 16, len(allNodes), processAllNode) - - return m, nil -} - -// initialize iterates "filteredNodes" to filter out the nodes which don't have required topologyKey(s), -// and initialize two maps: -// 1) m.topologyPairToPodCounts: keyed with both eligible topology pair and node names. -// 2) m.nodeNameSet: keyed with node name, and valued with a *int64 pointer for eligible node only. -func (m *podTopologySpreadMap) initialize(pod *v1.Pod, filteredNodes []*v1.Node) error { - constraints, err := filterSoftTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints) - if err != nil { - return err - } - if constraints == nil { - return nil - } - m.constraints = constraints - for _, node := range filteredNodes { - if !nodeLabelsMatchSpreadConstraints(node.Labels, m.constraints) { - continue - } - for _, constraint := range m.constraints { - pair := topologyPair{key: constraint.topologyKey, value: node.Labels[constraint.topologyKey]} - if m.topologyPairToPodCounts[pair] == nil { - m.topologyPairToPodCounts[pair] = new(int64) - } - } - m.nodeNameSet[node.Name] = struct{}{} - // For those nodes which don't have all required topologyKeys present, it's intentional to leave - // their entries absent in nodeNameSet, so that we're able to score them to 0 afterwards. - } - return nil -} - -// CalculateEvenPodsSpreadPriorityMap calculate the number of matching pods on the passed-in "node", -// and return the number as Score. -func CalculateEvenPodsSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { - node := nodeInfo.Node() - if node == nil { - return framework.NodeScore{}, fmt.Errorf("node not found") - } - - var m *podTopologySpreadMap - if priorityMeta, ok := meta.(*priorityMetadata); ok { - m = priorityMeta.podTopologySpreadMap - } - if m == nil { - return framework.NodeScore{Name: node.Name, Score: 0}, nil - } - - // no need to continue if the node is not qualified. - if _, ok := m.nodeNameSet[node.Name]; !ok { - return framework.NodeScore{Name: node.Name, Score: 0}, nil - } - - // For each present , current node gets a credit of . - // And we sum up and return it as this node's score. - var score int64 - for _, c := range m.constraints { - if tpVal, ok := node.Labels[c.topologyKey]; ok { - pair := topologyPair{key: c.topologyKey, value: tpVal} - matchSum := *m.topologyPairToPodCounts[pair] - score += matchSum - } - } - return framework.NodeScore{Name: node.Name, Score: score}, nil -} - -// CalculateEvenPodsSpreadPriorityReduce normalizes the score for each filteredNode, -// The basic rule is: the bigger the score(matching number of pods) is, the smaller the -// final normalized score will be. -func CalculateEvenPodsSpreadPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, - result framework.NodeScoreList) error { - var m *podTopologySpreadMap - if priorityMeta, ok := meta.(*priorityMetadata); ok { - m = priorityMeta.podTopologySpreadMap - } - if m == nil { - return nil - } - - // Calculate the summed score and . - var minScore int64 = math.MaxInt64 - var total int64 - for _, score := range result { - // it's mandatory to check if is present in m.nodeNameSet - if _, ok := m.nodeNameSet[score.Name]; !ok { - continue - } - total += score.Score - if score.Score < minScore { - minScore = score.Score - } - } - - maxMinDiff := total - minScore - for i := range result { - nodeInfo, err := sharedLister.NodeInfos().Get(result[i].Name) - if err != nil { - return err - } - node := nodeInfo.Node() - // Debugging purpose: print the score for each node. - // Score must be a pointer here, otherwise it's always 0. - if klog.V(10) { - defer func(score *int64, nodeName string) { - klog.Infof("%v -> %v: PodTopologySpread NormalizeScore, Score: (%d)", pod.Name, nodeName, *score) - }(&result[i].Score, node.Name) - } - - if maxMinDiff == 0 { - result[i].Score = framework.MaxNodeScore - continue - } - - if _, ok := m.nodeNameSet[node.Name]; !ok { - result[i].Score = 0 - continue - } - - flippedScore := total - result[i].Score - fScore := float64(framework.MaxNodeScore) * (float64(flippedScore) / float64(maxMinDiff)) - result[i].Score = int64(fScore) - } - return nil -} - -func filterSoftTopologySpreadConstraints(constraints []v1.TopologySpreadConstraint) ([]topologySpreadConstraint, error) { - var r []topologySpreadConstraint - for _, c := range constraints { - if c.WhenUnsatisfiable == v1.ScheduleAnyway { - selector, err := metav1.LabelSelectorAsSelector(c.LabelSelector) - if err != nil { - return nil, err - } - r = append(r, topologySpreadConstraint{ - topologyKey: c.TopologyKey, - selector: selector, - }) - } - } - return r, nil -} - -// nodeLabelsMatchSpreadConstraints checks if ALL topology keys in spread constraints are present in node labels. -func nodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints []topologySpreadConstraint) bool { - for _, c := range constraints { - if _, ok := nodeLabels[c.topologyKey]; !ok { - return false - } - } - return true -} diff --git a/pkg/scheduler/algorithm/priorities/metadata.go b/pkg/scheduler/algorithm/priorities/metadata.go index ba2ad4b9a3d..0aa47494db8 100644 --- a/pkg/scheduler/algorithm/priorities/metadata.go +++ b/pkg/scheduler/algorithm/priorities/metadata.go @@ -22,9 +22,7 @@ import ( "k8s.io/apimachinery/pkg/labels" appslisters "k8s.io/client-go/listers/apps/v1" corelisters "k8s.io/client-go/listers/core/v1" - "k8s.io/klog" schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" - schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) // MetadataFactory is a factory to produce PriorityMetadata. @@ -56,8 +54,7 @@ func NewMetadataFactory( // priorityMetadata is a type that is passed as metadata for priority functions type priorityMetadata struct { - podSelector labels.Selector - podTopologySpreadMap *podTopologySpreadMap + podSelector labels.Selector } // PriorityMetadata is a MetadataProducer. Node info can be nil. @@ -70,20 +67,8 @@ func (pmf *MetadataFactory) PriorityMetadata( if pod == nil { return nil } - var allNodes []*schedulernodeinfo.NodeInfo - if sharedLister != nil { - if l, err := sharedLister.NodeInfos().List(); err == nil { - allNodes = l - } - } - tpSpreadMap, err := buildPodTopologySpreadMap(pod, filteredNodes, allNodes) - if err != nil { - klog.Errorf("Error building podTopologySpreadMap: %v", err) - return nil - } return &priorityMetadata{ - podSelector: getSelector(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister), - podTopologySpreadMap: tpSpreadMap, + podSelector: getSelector(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister), } } diff --git a/pkg/scheduler/algorithm/priorities/spreading_perf_test.go b/pkg/scheduler/algorithm/priorities/spreading_perf_test.go index 86751c82c91..c9d26ffdefc 100644 --- a/pkg/scheduler/algorithm/priorities/spreading_perf_test.go +++ b/pkg/scheduler/algorithm/priorities/spreading_perf_test.go @@ -20,7 +20,6 @@ import ( "testing" v1 "k8s.io/api/core/v1" - framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" "k8s.io/kubernetes/pkg/scheduler/listers/fake" nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" st "k8s.io/kubernetes/pkg/scheduler/testing" @@ -48,41 +47,6 @@ var ( } ) -func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) { - for _, tt := range tests { - b.Run(tt.name, func(b *testing.B) { - pod := st.MakePod().Name("p").Label("foo", ""). - SpreadConstraint(1, v1.LabelHostname, softSpread, st.MakeLabelSelector().Exists("foo").Obj()). - SpreadConstraint(1, v1.LabelZoneFailureDomain, softSpread, st.MakeLabelSelector().Exists("foo").Obj()).Obj() - existingPods, allNodes, filteredNodes := st.MakeNodesAndPodsForEvenPodsSpread(pod.Labels, tt.existingPodsNum, tt.allNodesNum, tt.allNodesNum) - snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(existingPods, allNodes)) - b.ResetTimer() - - for i := 0; i < b.N; i++ { - tpSpreadMap, err := buildPodTopologySpreadMap(pod, filteredNodes, snapshot.NodeInfoList) - if err != nil { - b.Fatal(err) - } - meta := &priorityMetadata{ - podTopologySpreadMap: tpSpreadMap, - } - var gotList framework.NodeScoreList - for _, n := range filteredNodes { - score, err := CalculateEvenPodsSpreadPriorityMap(pod, meta, snapshot.NodeInfoMap[n.Name]) - if err != nil { - b.Fatal(err) - } - gotList = append(gotList, score) - } - err = CalculateEvenPodsSpreadPriorityReduce(pod, meta, snapshot, gotList) - if err != nil { - b.Fatal(err) - } - } - }) - } -} - func BenchmarkTestSelectorSpreadPriority(b *testing.B) { for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { diff --git a/pkg/scheduler/apis/config/testing/compatibility_test.go b/pkg/scheduler/apis/config/testing/compatibility_test.go index b7b8f962c11..681f577d6c8 100644 --- a/pkg/scheduler/apis/config/testing/compatibility_test.go +++ b/pkg/scheduler/apis/config/testing/compatibility_test.go @@ -1272,6 +1272,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "TaintToleration"}, {Name: "PodTopologySpread"}, }, + "PostFilterPlugin": { + {Name: "PodTopologySpread"}, + }, "ScorePlugin": { {Name: "PodTopologySpread", Weight: 2}, }, diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/BUILD b/pkg/scheduler/framework/plugins/podtopologyspread/BUILD index 84adc861eb8..4a2dc80ca3f 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/BUILD +++ b/pkg/scheduler/framework/plugins/podtopologyspread/BUILD @@ -3,14 +3,15 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ + "common.go", "filtering.go", "plugin.go", + "scoring.go", ], importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread", visibility = ["//visibility:public"], deps = [ "//pkg/scheduler/algorithm/predicates:go_default_library", - "//pkg/scheduler/algorithm/priorities:go_default_library", "//pkg/scheduler/framework/plugins/migration:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/listers:go_default_library", @@ -19,6 +20,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], @@ -26,7 +28,10 @@ go_library( go_test( name = "go_default_test", - srcs = ["filtering_test.go"], + srcs = [ + "filtering_test.go", + "scoring_test.go", + ], embed = [":go_default_library"], deps = [ "//pkg/scheduler/framework/v1alpha1:go_default_library", @@ -35,6 +40,7 @@ go_test( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", ], ) diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/common.go b/pkg/scheduler/framework/plugins/podtopologyspread/common.go new file mode 100644 index 00000000000..88c05078aea --- /dev/null +++ b/pkg/scheduler/framework/plugins/podtopologyspread/common.go @@ -0,0 +1,64 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podtopologyspread + +import ( + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" +) + +type topologyPair struct { + key string + value string +} + +// topologySpreadConstraint is an internal version for v1.TopologySpreadConstraint +// and where the selector is parsed. +type topologySpreadConstraint struct { + maxSkew int32 + topologyKey string + selector labels.Selector +} + +// nodeLabelsMatchSpreadConstraints checks if ALL topology keys in spread constraints are present in node labels. +func nodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints []topologySpreadConstraint) bool { + for _, c := range constraints { + if _, ok := nodeLabels[c.topologyKey]; !ok { + return false + } + } + return true +} + +func filterTopologySpreadConstraints(constraints []v1.TopologySpreadConstraint, action v1.UnsatisfiableConstraintAction) ([]topologySpreadConstraint, error) { + var result []topologySpreadConstraint + for _, c := range constraints { + if c.WhenUnsatisfiable == action { + selector, err := metav1.LabelSelectorAsSelector(c.LabelSelector) + if err != nil { + return nil, err + } + result = append(result, topologySpreadConstraint{ + maxSkew: c.MaxSkew, + topologyKey: c.TopologyKey, + selector: selector, + }) + } + } + return result, nil +} diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go b/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go index bb84f4955bd..217529d72f8 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go @@ -23,7 +23,6 @@ import ( "sync" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/util/workqueue" "k8s.io/klog" @@ -124,20 +123,6 @@ func (paths *criticalPaths) update(tpVal string, num int32) { } } -type topologyPair struct { - key string - value string -} - -// topologySpreadConstraint is an internal version for a hard (DoNotSchedule -// unsatisfiable constraint action) v1.TopologySpreadConstraint and where the -// selector is parsed. -type topologySpreadConstraint struct { - maxSkew int32 - topologyKey string - selector labels.Selector -} - func (s *preFilterState) updateWithPod(updatedPod, preemptorPod *v1.Pod, node *v1.Node, delta int32) { if s == nil || updatedPod.Namespace != preemptorPod.Namespace || node == nil { return @@ -160,16 +145,6 @@ func (s *preFilterState) updateWithPod(updatedPod, preemptorPod *v1.Pod, node *v } } -// nodeLabelsMatchSpreadConstraints checks if ALL topology keys in spread constraints are present in node labels. -func nodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints []topologySpreadConstraint) bool { - for _, c := range constraints { - if _, ok := nodeLabels[c.topologyKey]; !ok { - return false - } - } - return true -} - // PreFilter invoked at the prefilter extension point. func (pl *PodTopologySpread) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status { var s *preFilterState @@ -216,11 +191,11 @@ func (pl *PodTopologySpread) RemovePod(ctx context.Context, cycleState *framewor return nil } -// getPreFilterState fetches a pre-computed preFilterState +// getPreFilterState fetches a pre-computed preFilterState. func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) { c, err := cycleState.Read(preFilterStateKey) if err != nil { - // The metadata wasn't pre-computed in prefilter. We ignore the error for now since + // The preFilterState wasn't pre-computed in prefilter. We ignore the error for now since // we are able to handle that by computing it again (e.g. in Filter()). klog.V(5).Infof("Error reading %q from cycleState: %v", preFilterStateKey, err) return nil, nil @@ -228,7 +203,7 @@ func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error s, ok := c.(*preFilterState) if !ok { - return nil, fmt.Errorf("%+v convert to podtopologyspread.state error", c) + return nil, fmt.Errorf("%+v convert to podtopologyspread.preFilterState error", c) } return s, nil } @@ -237,7 +212,7 @@ func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error func calPreFilterState(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (*preFilterState, error) { // We have feature gating in APIServer to strip the spec // so don't need to re-check feature gate, just check length of constraints. - constraints, err := filterHardTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints) + constraints, err := filterTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints, v1.DoNotSchedule) if err != nil { return nil, err } @@ -306,24 +281,6 @@ func calPreFilterState(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (*pr return &s, nil } -func filterHardTopologySpreadConstraints(constraints []v1.TopologySpreadConstraint) ([]topologySpreadConstraint, error) { - var result []topologySpreadConstraint - for _, c := range constraints { - if c.WhenUnsatisfiable == v1.DoNotSchedule { - selector, err := metav1.LabelSelectorAsSelector(c.LabelSelector) - if err != nil { - return nil, err - } - result = append(result, topologySpreadConstraint{ - maxSkew: c.MaxSkew, - topologyKey: c.TopologyKey, - selector: selector, - }) - } - } - return result, nil -} - // Filter invoked at the filter extension point. func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { node := nodeInfo.Node() @@ -341,7 +298,7 @@ func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState *framework.C return framework.NewStatus(framework.Error, "preFilterState not pre-computed for PodTopologySpread Plugin") } - // However, "empty" meta is legit which tolerates every toSchedule Pod. + // However, "empty" preFilterState is legit which tolerates every toSchedule Pod. if len(s.tpPairToMatchNum) == 0 || len(s.constraints) == 0 { return nil } diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go b/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go index 05f566d6131..8e49de0ffb4 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go @@ -17,13 +17,9 @@ limitations under the License. package podtopologyspread import ( - "context" "fmt" - "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" ) @@ -35,6 +31,7 @@ type PodTopologySpread struct { var _ framework.PreFilterPlugin = &PodTopologySpread{} var _ framework.FilterPlugin = &PodTopologySpread{} +var _ framework.PostFilterPlugin = &PodTopologySpread{} var _ framework.ScorePlugin = &PodTopologySpread{} const ( @@ -47,32 +44,6 @@ func (pl *PodTopologySpread) Name() string { return Name } -// Score invoked at the Score extension point. -// The "score" returned in this function is the matching number of pods on the `nodeName`, -// it is normalized later. -func (pl *PodTopologySpread) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { - nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName) - if err != nil { - return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) - } - - meta := migration.PriorityMetadata(state) - s, err := priorities.CalculateEvenPodsSpreadPriorityMap(pod, meta, nodeInfo) - return s.Score, migration.ErrorToFrameworkStatus(err) -} - -// NormalizeScore invoked after scoring all nodes. -func (pl *PodTopologySpread) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status { - meta := migration.PriorityMetadata(state) - err := priorities.CalculateEvenPodsSpreadPriorityReduce(pod, meta, pl.sharedLister, scores) - return migration.ErrorToFrameworkStatus(err) -} - -// ScoreExtensions of the Score plugin. -func (pl *PodTopologySpread) ScoreExtensions() framework.ScoreExtensions { - return pl -} - // New initializes a new plugin and returns it. func New(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) { if h.SnapshotSharedLister() == nil { diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/scoring.go b/pkg/scheduler/framework/plugins/podtopologyspread/scoring.go new file mode 100644 index 00000000000..e7eb8fda7d5 --- /dev/null +++ b/pkg/scheduler/framework/plugins/podtopologyspread/scoring.go @@ -0,0 +1,256 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podtopologyspread + +import ( + "context" + "fmt" + "math" + "sync/atomic" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog" + "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" +) + +const postFilterStateKey = "PostFilter" + Name + +// postFilterState computed at PostFilter and used at Score. +type postFilterState struct { + constraints []topologySpreadConstraint + // nodeNameSet is a string set holding all node names which have all constraints[*].topologyKey present. + nodeNameSet sets.String + // topologyPairToPodCounts is keyed with topologyPair, and valued with the number of matching pods. + topologyPairToPodCounts map[topologyPair]*int64 +} + +// Clone implements the mandatory Clone interface. We don't really copy the data since +// there is no need for that. +func (s *postFilterState) Clone() framework.StateData { + return s +} + +// initialize iterates "filteredNodes" to filter out the nodes which don't have required topologyKey(s), +// and initialize two maps: +// 1) s.topologyPairToPodCounts: keyed with both eligible topology pair and node names. +// 2) s.nodeNameSet: keyed with node name, and valued with a *int64 pointer for eligible node only. +func (s *postFilterState) initialize(pod *v1.Pod, filteredNodes []*v1.Node) error { + constraints, err := filterTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints, v1.ScheduleAnyway) + if err != nil { + return err + } + if constraints == nil { + return nil + } + s.constraints = constraints + for _, node := range filteredNodes { + if !nodeLabelsMatchSpreadConstraints(node.Labels, s.constraints) { + continue + } + for _, constraint := range s.constraints { + pair := topologyPair{key: constraint.topologyKey, value: node.Labels[constraint.topologyKey]} + if s.topologyPairToPodCounts[pair] == nil { + s.topologyPairToPodCounts[pair] = new(int64) + } + } + s.nodeNameSet.Insert(node.Name) + // For those nodes which don't have all required topologyKeys present, it's intentional to leave + // their entries absent in nodeNameSet, so that we're able to score them to 0 afterwards. + } + return nil +} + +// PostFilter builds and writes cycle state used by Score and NormalizeScore. +func (pl *PodTopologySpread) PostFilter( + ctx context.Context, + cycleState *framework.CycleState, + pod *v1.Pod, + filteredNodes []*v1.Node, + _ framework.NodeToStatusMap, +) *framework.Status { + allNodes, err := pl.sharedLister.NodeInfos().List() + if err != nil { + return framework.NewStatus(framework.Error, fmt.Sprintf("error when getting all nodes: %v", err)) + } + + if len(filteredNodes) == 0 || len(allNodes) == 0 { + // No nodes to score. + return nil + } + + state := &postFilterState{ + nodeNameSet: sets.String{}, + topologyPairToPodCounts: make(map[topologyPair]*int64), + } + err = state.initialize(pod, filteredNodes) + if err != nil { + return framework.NewStatus(framework.Error, fmt.Sprintf("error when calculating postFilterState: %v", err)) + } + + // return if incoming pod doesn't have soft topology spread constraints. + if state.constraints == nil { + cycleState.Write(postFilterStateKey, state) + return nil + } + + processAllNode := func(i int) { + nodeInfo := allNodes[i] + node := nodeInfo.Node() + if node == nil { + return + } + // (1) `node` should satisfy incoming pod's NodeSelector/NodeAffinity + // (2) All topologyKeys need to be present in `node` + if !predicates.PodMatchesNodeSelectorAndAffinityTerms(pod, node) || + !nodeLabelsMatchSpreadConstraints(node.Labels, state.constraints) { + return + } + + for _, c := range state.constraints { + pair := topologyPair{key: c.topologyKey, value: node.Labels[c.topologyKey]} + // If current topology pair is not associated with any candidate node, + // continue to avoid unnecessary calculation. + if state.topologyPairToPodCounts[pair] == nil { + continue + } + + // indicates how many pods (on current node) match the . + matchSum := int64(0) + for _, existingPod := range nodeInfo.Pods() { + if c.selector.Matches(labels.Set(existingPod.Labels)) { + matchSum++ + } + } + atomic.AddInt64(state.topologyPairToPodCounts[pair], matchSum) + } + } + workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processAllNode) + + cycleState.Write(postFilterStateKey, state) + return nil +} + +// Score invoked at the Score extension point. +// The "score" returned in this function is the matching number of pods on the `nodeName`, +// it is normalized later. +func (pl *PodTopologySpread) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { + nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName) + if err != nil || nodeInfo.Node() == nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node() == nil)) + } + + node := nodeInfo.Node() + s, err := getPostFilterState(cycleState) + if err != nil { + return 0, framework.NewStatus(framework.Error, err.Error()) + } + + // Return if the node is not qualified. + if _, ok := s.nodeNameSet[node.Name]; !ok { + return 0, nil + } + + // For each present , current node gets a credit of . + // And we sum up and return it as this node's score. + var score int64 + for _, c := range s.constraints { + if tpVal, ok := node.Labels[c.topologyKey]; ok { + pair := topologyPair{key: c.topologyKey, value: tpVal} + matchSum := *s.topologyPairToPodCounts[pair] + score += matchSum + } + } + return score, nil +} + +// NormalizeScore invoked after scoring all nodes. +func (pl *PodTopologySpread) NormalizeScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status { + s, err := getPostFilterState(cycleState) + if err != nil { + return framework.NewStatus(framework.Error, err.Error()) + } + if s == nil { + return nil + } + + // Calculate the summed score and . + var minScore int64 = math.MaxInt64 + var total int64 + for _, score := range scores { + // it's mandatory to check if is present in m.nodeNameSet + if _, ok := s.nodeNameSet[score.Name]; !ok { + continue + } + total += score.Score + if score.Score < minScore { + minScore = score.Score + } + } + + maxMinDiff := total - minScore + for i := range scores { + nodeInfo, err := pl.sharedLister.NodeInfos().Get(scores[i].Name) + if err != nil { + return framework.NewStatus(framework.Error, err.Error()) + } + node := nodeInfo.Node() + // Debugging purpose: print the score for each node. + // Score must be a pointer here, otherwise it's always 0. + if klog.V(10) { + defer func(score *int64, nodeName string) { + klog.Infof("%v -> %v: PodTopologySpread NormalizeScore, Score: (%d)", pod.Name, nodeName, *score) + }(&scores[i].Score, node.Name) + } + + if maxMinDiff == 0 { + scores[i].Score = framework.MaxNodeScore + continue + } + + if _, ok := s.nodeNameSet[node.Name]; !ok { + scores[i].Score = 0 + continue + } + + flippedScore := total - scores[i].Score + fScore := float64(framework.MaxNodeScore) * (float64(flippedScore) / float64(maxMinDiff)) + scores[i].Score = int64(fScore) + } + return nil +} + +// ScoreExtensions of the Score plugin. +func (pl *PodTopologySpread) ScoreExtensions() framework.ScoreExtensions { + return pl +} + +func getPostFilterState(cycleState *framework.CycleState) (*postFilterState, error) { + c, err := cycleState.Read(postFilterStateKey) + if err != nil { + return nil, fmt.Errorf("error reading %q from cycleState: %v", postFilterStateKey, err) + } + + s, ok := c.(*postFilterState) + if !ok { + return nil, fmt.Errorf("%+v convert to podtopologyspread.postFilterState error", c) + } + return s, nil +} diff --git a/pkg/scheduler/algorithm/priorities/even_pods_spread_test.go b/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go similarity index 83% rename from pkg/scheduler/algorithm/priorities/even_pods_spread_test.go rename to pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go index d4c9fbcefd2..2330390b3c7 100644 --- a/pkg/scheduler/algorithm/priorities/even_pods_spread_test.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go @@ -14,24 +14,26 @@ See the License for the specific language governing permissions and limitations under the License. */ -package priorities +package podtopologyspread import ( + "context" "reflect" "testing" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" st "k8s.io/kubernetes/pkg/scheduler/testing" ) -func Test_podTopologySpreadMap_initialize(t *testing.T) { +func TestPostFilterStateInitialize(t *testing.T) { tests := []struct { name string pod *v1.Pod nodes []*v1.Node - wantNodeNameSet map[string]struct{} + wantNodeNameSet sets.String wantTopologyPairMap map[topologyPair]*int64 }{ { @@ -45,11 +47,7 @@ func Test_podTopologySpreadMap_initialize(t *testing.T) { st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(), st.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(), }, - wantNodeNameSet: map[string]struct{}{ - "node-a": {}, - "node-b": {}, - "node-x": {}, - }, + wantNodeNameSet: sets.NewString("node-a", "node-b", "node-x"), wantTopologyPairMap: map[topologyPair]*int64{ {key: "zone", value: "zone1"}: new(int64), {key: "zone", value: "zone2"}: new(int64), @@ -69,10 +67,7 @@ func Test_podTopologySpreadMap_initialize(t *testing.T) { st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(), st.MakeNode().Name("node-x").Label("node", "node-x").Obj(), }, - wantNodeNameSet: map[string]struct{}{ - "node-a": {}, - "node-b": {}, - }, + wantNodeNameSet: sets.NewString("node-a", "node-b"), wantTopologyPairMap: map[topologyPair]*int64{ {key: "zone", value: "zone1"}: new(int64), {key: "node", value: "node-a"}: new(int64), @@ -82,21 +77,24 @@ func Test_podTopologySpreadMap_initialize(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - m := newTopologySpreadConstraintsMap() - if err := m.initialize(tt.pod, tt.nodes); err != nil { + s := &postFilterState{ + nodeNameSet: sets.String{}, + topologyPairToPodCounts: make(map[topologyPair]*int64), + } + if err := s.initialize(tt.pod, tt.nodes); err != nil { t.Fatal(err) } - if !reflect.DeepEqual(m.nodeNameSet, tt.wantNodeNameSet) { - t.Errorf("initilize().nodeNameSet = %#v, want %#v", m.nodeNameSet, tt.wantNodeNameSet) + if !reflect.DeepEqual(s.nodeNameSet, tt.wantNodeNameSet) { + t.Errorf("initilize().nodeNameSet = %#v, want %#v", s.nodeNameSet, tt.wantNodeNameSet) } - if !reflect.DeepEqual(m.topologyPairToPodCounts, tt.wantTopologyPairMap) { - t.Errorf("initilize().topologyPairToPodCounts = %#v, want %#v", m.topologyPairToPodCounts, tt.wantTopologyPairMap) + if !reflect.DeepEqual(s.topologyPairToPodCounts, tt.wantTopologyPairMap) { + t.Errorf("initilize().topologyPairToPodCounts = %#v, want %#v", s.topologyPairToPodCounts, tt.wantTopologyPairMap) } }) } } -func TestCalculateEvenPodsSpreadPriority(t *testing.T) { +func TestPodTopologySpreadScore(t *testing.T) { tests := []struct { name string pod *v1.Pod @@ -436,37 +434,37 @@ func TestCalculateEvenPodsSpreadPriority(t *testing.T) { t.Run(tt.name, func(t *testing.T) { allNodes := append([]*v1.Node{}, tt.nodes...) allNodes = append(allNodes, tt.failedNodes...) + state := framework.NewCycleState() snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(tt.existingPods, allNodes)) + p := &PodTopologySpread{sharedLister: snapshot} - tpSpreadMap, err := buildPodTopologySpreadMap(tt.pod, tt.nodes, snapshot.NodeInfoList) - if err != nil { - t.Fatal(err) - } - meta := &priorityMetadata{ - podTopologySpreadMap: tpSpreadMap, + status := p.PostFilter(context.Background(), state, tt.pod, tt.nodes, nil) + if !status.IsSuccess() { + t.Errorf("unexpected error: %v", status) } + var gotList framework.NodeScoreList for _, n := range tt.nodes { nodeName := n.Name - nodeScore, err := CalculateEvenPodsSpreadPriorityMap(tt.pod, meta, snapshot.NodeInfoMap[nodeName]) - if err != nil { - t.Error(err) + score, status := p.Score(context.Background(), state, tt.pod, nodeName) + if !status.IsSuccess() { + t.Errorf("unexpected error: %v", status) } - gotList = append(gotList, nodeScore) + gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score}) } - err = CalculateEvenPodsSpreadPriorityReduce(tt.pod, meta, snapshot, gotList) - if err != nil { - t.Fatal(err) + status = p.NormalizeScore(context.Background(), state, tt.pod, gotList) + if !status.IsSuccess() { + t.Errorf("unexpected error: %v", status) } - if !reflect.DeepEqual(gotList, tt.want) { - t.Errorf("CalculateEvenPodsSpreadPriorityReduce() = %#v, want %#v", gotList, tt.want) + if !reflect.DeepEqual(tt.want, gotList) { + t.Errorf("expected:\n\t%+v,\ngot:\n\t%+v", tt.want, gotList) } }) } } -func BenchmarkTestCalculateEvenPodsSpreadPriority(b *testing.B) { +func BenchmarkTestPodTopologySpreadScore(b *testing.B) { tests := []struct { name string pod *v1.Pod @@ -506,13 +504,13 @@ func BenchmarkTestCalculateEvenPodsSpreadPriority(b *testing.B) { for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { existingPods, allNodes, filteredNodes := st.MakeNodesAndPodsForEvenPodsSpread(tt.pod.Labels, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum) + state := framework.NewCycleState() snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(existingPods, allNodes)) - tpSpreadMap, err := buildPodTopologySpreadMap(tt.pod, filteredNodes, snapshot.NodeInfoList) - if err != nil { - b.Fatal(err) - } - meta := &priorityMetadata{ - podTopologySpreadMap: tpSpreadMap, + p := &PodTopologySpread{sharedLister: snapshot} + + status := p.PostFilter(context.Background(), state, tt.pod, filteredNodes, nil) + if !status.IsSuccess() { + b.Fatalf("unexpected error: %v", status) } b.ResetTimer() @@ -520,16 +518,77 @@ func BenchmarkTestCalculateEvenPodsSpreadPriority(b *testing.B) { var gotList framework.NodeScoreList for _, n := range filteredNodes { nodeName := n.Name - nodeScore, _ := CalculateEvenPodsSpreadPriorityMap(tt.pod, meta, snapshot.NodeInfoMap[nodeName]) - gotList = append(gotList, nodeScore) + score, status := p.Score(context.Background(), state, tt.pod, nodeName) + if !status.IsSuccess() { + b.Fatalf("unexpected error: %v", status) + } + gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score}) } - err := CalculateEvenPodsSpreadPriorityReduce(tt.pod, meta, snapshot, gotList) - if err != nil { - b.Fatal(err) + + status = p.NormalizeScore(context.Background(), state, tt.pod, gotList) + if !status.IsSuccess() { + b.Fatal(status) } } }) } } -var softSpread = v1.ScheduleAnyway +// The tests in this file compare the performance of SelectorSpreadPriority +// against EvenPodsSpreadPriority with a similar rule. + +var ( + tests = []struct { + name string + existingPodsNum int + allNodesNum int + }{ + { + name: "100nodes", + existingPodsNum: 1000, + allNodesNum: 100, + }, + { + name: "1000nodes", + existingPodsNum: 10000, + allNodesNum: 1000, + }, + } +) + +func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) { + for _, tt := range tests { + b.Run(tt.name, func(b *testing.B) { + pod := st.MakePod().Name("p").Label("foo", ""). + SpreadConstraint(1, v1.LabelHostname, softSpread, st.MakeLabelSelector().Exists("foo").Obj()). + SpreadConstraint(1, v1.LabelZoneFailureDomain, softSpread, st.MakeLabelSelector().Exists("foo").Obj()).Obj() + existingPods, allNodes, filteredNodes := st.MakeNodesAndPodsForEvenPodsSpread(pod.Labels, tt.existingPodsNum, tt.allNodesNum, tt.allNodesNum) + state := framework.NewCycleState() + snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(existingPods, allNodes)) + p := &PodTopologySpread{sharedLister: snapshot} + + status := p.PostFilter(context.Background(), state, pod, filteredNodes, nil) + if !status.IsSuccess() { + b.Fatalf("unexpected error: %v", status) + } + b.ResetTimer() + + for i := 0; i < b.N; i++ { + var gotList framework.NodeScoreList + for _, n := range filteredNodes { + nodeName := n.Name + score, status := p.Score(context.Background(), state, pod, nodeName) + if !status.IsSuccess() { + b.Fatalf("unexpected error: %v", status) + } + gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score}) + } + + status = p.NormalizeScore(context.Background(), state, pod, gotList) + if !status.IsSuccess() { + b.Fatal(status) + } + } + }) + } +} diff --git a/pkg/scheduler/framework/plugins/registry.go b/pkg/scheduler/framework/plugins/registry.go index 8a4ac570042..f37bf6ea21d 100644 --- a/pkg/scheduler/framework/plugins/registry.go +++ b/pkg/scheduler/framework/plugins/registry.go @@ -284,6 +284,7 @@ func NewConfigProducerRegistry() *ConfigProducerRegistry { }) registry.RegisterPriority(priorities.EvenPodsSpreadPriority, func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { + plugins.PostFilter = appendToPluginSet(plugins.PostFilter, podtopologyspread.Name, nil) plugins.Score = appendToPluginSet(plugins.Score, podtopologyspread.Name, &args.Weight) return })