Merge pull request #86890 from damemi/move-selector-spread-to-plugin

Move selector spreading priority code to plugin
This commit is contained in:
Kubernetes Prow Robot 2020-01-08 21:29:45 -08:00 committed by GitHub
commit 4d41f4809f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 343 additions and 1196 deletions

View File

@ -3,7 +3,6 @@ package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
@ -12,8 +11,6 @@ go_library(
"metadata.go",
"priorities.go",
"reduce.go",
"selector_spreading.go",
"test_util.go",
"types.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities",
@ -21,39 +18,9 @@ go_library(
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"metadata_test.go",
"selector_spreading_test.go",
"spreading_perf_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/listers/fake:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
],
)

View File

@ -18,8 +18,6 @@ package priorities
import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
appslisters "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
@ -50,9 +48,7 @@ func NewMetadataFactory(
}
// priorityMetadata is a type that is passed as metadata for priority functions
type priorityMetadata struct {
podSelector labels.Selector
}
type priorityMetadata struct{}
// PriorityMetadata is a MetadataProducer. Node info can be nil.
func (pmf *MetadataFactory) PriorityMetadata(
@ -64,53 +60,5 @@ func (pmf *MetadataFactory) PriorityMetadata(
if pod == nil {
return nil
}
return &priorityMetadata{
podSelector: getSelector(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister),
}
}
// getSelector returns a selector for the services, RCs, RSs, and SSs matching the given pod.
func getSelector(pod *v1.Pod, sl corelisters.ServiceLister, cl corelisters.ReplicationControllerLister, rsl appslisters.ReplicaSetLister, ssl appslisters.StatefulSetLister) labels.Selector {
labelSet := make(labels.Set)
// Since services, RCs, RSs and SSs match the pod, they won't have conflicting
// labels. Merging is safe.
if services, err := schedulerlisters.GetPodServices(sl, pod); err == nil {
for _, service := range services {
labelSet = labels.Merge(labelSet, service.Spec.Selector)
}
}
if rcs, err := cl.GetPodControllers(pod); err == nil {
for _, rc := range rcs {
labelSet = labels.Merge(labelSet, rc.Spec.Selector)
}
}
selector := labels.NewSelector()
if len(labelSet) != 0 {
selector = labelSet.AsSelector()
}
if rss, err := rsl.GetPodReplicaSets(pod); err == nil {
for _, rs := range rss {
if other, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector); err == nil {
if r, ok := other.Requirements(); ok {
selector = selector.Add(r...)
}
}
}
}
if sss, err := ssl.GetPodStatefulSets(pod); err == nil {
for _, ss := range sss {
if other, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector); err == nil {
if r, ok := other.Requirements(); ok {
selector = selector.Add(r...)
}
}
}
}
return selector
return &priorityMetadata{}
}

View File

@ -1,167 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package priorities
import (
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
func TestPriorityMetadata(t *testing.T) {
nonZeroReqs := &schedulernodeinfo.Resource{}
nonZeroReqs.MilliCPU = priorityutil.DefaultMilliCPURequest
nonZeroReqs.Memory = priorityutil.DefaultMemoryRequest
tolerations := []v1.Toleration{{
Key: "foo",
Operator: v1.TolerationOpEqual,
Value: "bar",
Effect: v1.TaintEffectPreferNoSchedule,
}}
podAffinity := &v1.Affinity{
PodAffinity: &v1.PodAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{
{
Weight: 5,
PodAffinityTerm: v1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "security",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"S1"},
},
},
},
TopologyKey: "region",
},
},
},
},
}
podWithTolerationsAndAffinity := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "container",
Image: "image",
ImagePullPolicy: "Always",
},
},
Affinity: podAffinity,
Tolerations: tolerations,
},
}
podWithTolerationsAndRequests := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "container",
Image: "image",
ImagePullPolicy: "Always",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("200m"),
v1.ResourceMemory: resource.MustParse("2000"),
},
},
},
},
Tolerations: tolerations,
},
}
podWithAffinityAndRequests := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "container",
Image: "image",
ImagePullPolicy: "Always",
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("200m"),
v1.ResourceMemory: resource.MustParse("2000"),
},
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("200m"),
v1.ResourceMemory: resource.MustParse("2000"),
},
},
},
},
Affinity: podAffinity,
},
}
tests := []struct {
pod *v1.Pod
name string
expected interface{}
}{
{
pod: nil,
expected: nil,
name: "pod is nil , priorityMetadata is nil",
},
{
pod: podWithTolerationsAndAffinity,
expected: &priorityMetadata{
podSelector: labels.NewSelector(),
},
name: "Produce a priorityMetadata with default requests",
},
{
pod: podWithTolerationsAndRequests,
expected: &priorityMetadata{
podSelector: labels.NewSelector(),
},
name: "Produce a priorityMetadata with tolerations and requests",
},
{
pod: podWithAffinityAndRequests,
expected: &priorityMetadata{
podSelector: labels.NewSelector(),
},
name: "Produce a priorityMetadata with affinity and requests",
},
}
client := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
metaDataProducer := NewMetadataFactory(
informerFactory.Core().V1().Services().Lister(),
informerFactory.Core().V1().ReplicationControllers().Lister(),
informerFactory.Apps().V1().ReplicaSets().Lister(),
informerFactory.Apps().V1().StatefulSets().Lister(),
)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ptData := metaDataProducer(test.pod, nil, nil)
if !reflect.DeepEqual(test.expected, ptData) {
t.Errorf("expected %#v, got %#v", test.expected, ptData)
}
})
}
}

View File

@ -1,173 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package priorities
import (
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
appslisters "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
utilnode "k8s.io/kubernetes/pkg/util/node"
"k8s.io/klog"
)
// When zone information is present, give 2/3 of the weighting to zone spreading, 1/3 to node spreading
// TODO: Any way to justify this weighting?
const zoneWeighting float64 = 2.0 / 3.0
// SelectorSpread contains information to calculate selector spread priority.
type SelectorSpread struct {
serviceLister corelisters.ServiceLister
controllerLister corelisters.ReplicationControllerLister
replicaSetLister appslisters.ReplicaSetLister
statefulSetLister appslisters.StatefulSetLister
}
// NewSelectorSpreadPriority creates a SelectorSpread.
func NewSelectorSpreadPriority(
serviceLister corelisters.ServiceLister,
controllerLister corelisters.ReplicationControllerLister,
replicaSetLister appslisters.ReplicaSetLister,
statefulSetLister appslisters.StatefulSetLister) (PriorityMapFunction, PriorityReduceFunction) {
selectorSpread := &SelectorSpread{
serviceLister: serviceLister,
controllerLister: controllerLister,
replicaSetLister: replicaSetLister,
statefulSetLister: statefulSetLister,
}
return selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce
}
// CalculateSpreadPriorityMap spreads pods across hosts, considering pods
// belonging to the same service,RC,RS or StatefulSet.
// When a pod is scheduled, it looks for services, RCs,RSs and StatefulSets that match the pod,
// then finds existing pods that match those selectors.
// It favors nodes that have fewer existing matching pods.
// i.e. it pushes the scheduler towards a node where there's the smallest number of
// pods which match the same service, RC,RSs or StatefulSets selectors as the pod being scheduled.
func (s *SelectorSpread) CalculateSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
var selector labels.Selector
node := nodeInfo.Node()
if node == nil {
return framework.NodeScore{}, fmt.Errorf("node not found")
}
priorityMeta, ok := meta.(*priorityMetadata)
if ok {
selector = priorityMeta.podSelector
} else {
selector = getSelector(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister)
}
count := countMatchingPods(pod.Namespace, selector, nodeInfo)
return framework.NodeScore{
Name: node.Name,
Score: int64(count),
}, nil
}
// CalculateSpreadPriorityReduce calculates the source of each node
// based on the number of existing matching pods on the node
// where zone information is included on the nodes, it favors nodes
// in zones with fewer existing matching pods.
func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, result framework.NodeScoreList) error {
countsByZone := make(map[string]int64, 10)
maxCountByZone := int64(0)
maxCountByNodeName := int64(0)
for i := range result {
if result[i].Score > maxCountByNodeName {
maxCountByNodeName = result[i].Score
}
nodeInfo, err := sharedLister.NodeInfos().Get(result[i].Name)
if err != nil {
return err
}
zoneID := utilnode.GetZoneKey(nodeInfo.Node())
if zoneID == "" {
continue
}
countsByZone[zoneID] += result[i].Score
}
for zoneID := range countsByZone {
if countsByZone[zoneID] > maxCountByZone {
maxCountByZone = countsByZone[zoneID]
}
}
haveZones := len(countsByZone) != 0
maxCountByNodeNameFloat64 := float64(maxCountByNodeName)
maxCountByZoneFloat64 := float64(maxCountByZone)
MaxNodeScoreFloat64 := float64(framework.MaxNodeScore)
for i := range result {
// initializing to the default/max node score of maxPriority
fScore := MaxNodeScoreFloat64
if maxCountByNodeName > 0 {
fScore = MaxNodeScoreFloat64 * (float64(maxCountByNodeName-result[i].Score) / maxCountByNodeNameFloat64)
}
// If there is zone information present, incorporate it
if haveZones {
nodeInfo, err := sharedLister.NodeInfos().Get(result[i].Name)
if err != nil {
return err
}
zoneID := utilnode.GetZoneKey(nodeInfo.Node())
if zoneID != "" {
zoneScore := MaxNodeScoreFloat64
if maxCountByZone > 0 {
zoneScore = MaxNodeScoreFloat64 * (float64(maxCountByZone-countsByZone[zoneID]) / maxCountByZoneFloat64)
}
fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore)
}
}
result[i].Score = int64(fScore)
if klog.V(10) {
klog.Infof(
"%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, result[i].Name, int64(fScore),
)
}
}
return nil
}
// countMatchingPods counts pods based on namespace and matching all selectors
func countMatchingPods(namespace string, selector labels.Selector, nodeInfo *schedulernodeinfo.NodeInfo) int {
if nodeInfo.Pods() == nil || len(nodeInfo.Pods()) == 0 || selector.Empty() {
return 0
}
count := 0
for _, pod := range nodeInfo.Pods() {
// Ignore pods being deleted for spreading purposes
// Similar to how it is done for SelectorSpreadPriority
if namespace == pod.Namespace && pod.DeletionTimestamp == nil {
if selector.Matches(labels.Set(pod.Labels)) {
count++
}
}
}
return count
}

View File

@ -1,622 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package priorities
import (
"reflect"
"testing"
"github.com/google/go-cmp/cmp"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
func controllerRef(kind, name, uid string) []metav1.OwnerReference {
// TODO: When ControllerRef will be implemented uncomment code below.
return nil
//trueVar := true
//return []metav1.OwnerReference{
// {Kind: kind, Name: name, UID: types.UID(uid), Controller: &trueVar},
//}
}
func TestSelectorSpreadPriority(t *testing.T) {
labels1 := map[string]string{
"foo": "bar",
"baz": "blah",
}
labels2 := map[string]string{
"bar": "foo",
"baz": "blah",
}
zone1Spec := v1.PodSpec{
NodeName: "machine1",
}
zone2Spec := v1.PodSpec{
NodeName: "machine2",
}
tests := []struct {
pod *v1.Pod
pods []*v1.Pod
nodes []string
rcs []*v1.ReplicationController
rss []*apps.ReplicaSet
services []*v1.Service
sss []*apps.StatefulSet
expectedList framework.NodeScoreList
name string
}{
{
pod: new(v1.Pod),
nodes: []string{"machine1", "machine2"},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}},
name: "nothing scheduled",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
pods: []*v1.Pod{{Spec: zone1Spec}},
nodes: []string{"machine1", "machine2"},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}},
name: "no services",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
pods: []*v1.Pod{{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}}},
nodes: []string{"machine1", "machine2"},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}},
name: "different services",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}}},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}},
name: "two pods, one service pod",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, Namespace: metav1.NamespaceDefault}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, Namespace: "ns1"}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
},
nodes: []string{"machine1", "machine2"},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}}},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}},
name: "five pods, one service pod in no namespace",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1, Namespace: metav1.NamespaceDefault}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, Namespace: "ns1"}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, Namespace: metav1.NamespaceDefault}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
},
nodes: []string{"machine1", "machine2"},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}, ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceDefault}}},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}},
name: "four pods, one service pod in default namespace",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1, Namespace: "ns1"}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, Namespace: metav1.NamespaceDefault}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, Namespace: "ns2"}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, Namespace: "ns1"}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
},
nodes: []string{"machine1", "machine2"},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}, ObjectMeta: metav1.ObjectMeta{Namespace: "ns1"}}},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}},
name: "five pods, one service pod in specific namespace",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}}},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 0}},
name: "three pods, two service pods on different machines",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}}},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 50}, {Name: "machine2", Score: 0}},
name: "four pods, three service pods",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 50}},
name: "service with partial pod label matches",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
rcs: []*v1.ReplicationController{{Spec: v1.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
// "baz=blah" matches both labels1 and labels2, and "foo=bar" matches only labels 1. This means that we assume that we want to
// do spreading pod2 and pod3 and not pod1.
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 0}},
name: "service with partial pod label matches with service and replication controller",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
rss: []*apps.ReplicaSet{{Spec: apps.ReplicaSetSpec{Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 0}},
name: "service with partial pod label matches with service and replica set",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
sss: []*apps.StatefulSet{{Spec: apps.StatefulSetSpec{Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 0}},
name: "service with partial pod label matches with service and stateful set",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar", "bar": "foo"}, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
rcs: []*v1.ReplicationController{{Spec: v1.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}},
// Taken together Service and Replication Controller should match no pods.
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}},
name: "disjoined service and replication controller matches no pods",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar", "bar": "foo"}, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}},
rss: []*apps.ReplicaSet{{Spec: apps.ReplicaSetSpec{Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}},
name: "disjoined service and replica set matches no pods",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar", "bar": "foo"}, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}},
sss: []*apps.StatefulSet{{Spec: apps.StatefulSetSpec{Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}},
name: "disjoined service and stateful set matches no pods",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
rcs: []*v1.ReplicationController{{Spec: v1.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
// Both Nodes have one pod from the given RC, hence both get 0 score.
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 0}},
name: "Replication controller with partial pod label matches",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
rss: []*apps.ReplicaSet{{Spec: apps.ReplicaSetSpec{Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 0}},
name: "Replica set with partial pod label matches",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
sss: []*apps.StatefulSet{{Spec: apps.StatefulSetSpec{Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
// We use StatefulSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 0}},
name: "StatefulSet with partial pod label matches",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
rcs: []*v1.ReplicationController{{Spec: v1.ReplicationControllerSpec{Selector: map[string]string{"baz": "blah"}}}},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 50}},
name: "Another replication controller with partial pod label matches",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
rss: []*apps.ReplicaSet{{Spec: apps.ReplicaSetSpec{Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"baz": "blah"}}}}},
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 50}},
name: "Another replication set with partial pod label matches",
},
{
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
pods: []*v1.Pod{
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("StatefulSet", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
sss: []*apps.StatefulSet{{Spec: apps.StatefulSetSpec{Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"baz": "blah"}}}}},
// We use StatefulSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 50}},
name: "Another stateful set with partial pod label matches",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodes := makeNodeList(test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, nodes))
selectorSpread := SelectorSpread{
serviceLister: fakelisters.ServiceLister(test.services),
controllerLister: fakelisters.ControllerLister(test.rcs),
replicaSetLister: fakelisters.ReplicaSetLister(test.rss),
statefulSetLister: fakelisters.StatefulSetLister(test.sss),
}
metaDataProducer := NewMetadataFactory(
fakelisters.ServiceLister(test.services),
fakelisters.ControllerLister(test.rcs),
fakelisters.ReplicaSetLister(test.rss),
fakelisters.StatefulSetLister(test.sss),
)
metaData := metaDataProducer(test.pod, nodes, snapshot)
list, err := runMapReducePriority(selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce, metaData, test.pod, snapshot, makeNodeList(test.nodes))
if err != nil {
t.Errorf("unexpected error: %v \n", err)
}
if diff := cmp.Diff(test.expectedList, list); diff != "" {
t.Errorf("wrong priorities produced (-want, +got): %s", diff)
}
})
}
}
func buildPod(nodeName string, labels map[string]string, ownerRefs []metav1.OwnerReference) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Labels: labels, OwnerReferences: ownerRefs},
Spec: v1.PodSpec{NodeName: nodeName},
}
}
func TestZoneSelectorSpreadPriority(t *testing.T) {
labels1 := map[string]string{
"label1": "l1",
"baz": "blah",
}
labels2 := map[string]string{
"label2": "l2",
"baz": "blah",
}
const nodeMachine1Zone1 = "machine1.zone1"
const nodeMachine1Zone2 = "machine1.zone2"
const nodeMachine2Zone2 = "machine2.zone2"
const nodeMachine1Zone3 = "machine1.zone3"
const nodeMachine2Zone3 = "machine2.zone3"
const nodeMachine3Zone3 = "machine3.zone3"
buildNodeLabels := func(failureDomain string) map[string]string {
labels := map[string]string{
v1.LabelZoneFailureDomain: failureDomain,
}
return labels
}
labeledNodes := map[string]map[string]string{
nodeMachine1Zone1: buildNodeLabels("zone1"),
nodeMachine1Zone2: buildNodeLabels("zone2"),
nodeMachine2Zone2: buildNodeLabels("zone2"),
nodeMachine1Zone3: buildNodeLabels("zone3"),
nodeMachine2Zone3: buildNodeLabels("zone3"),
nodeMachine3Zone3: buildNodeLabels("zone3"),
}
tests := []struct {
pod *v1.Pod
pods []*v1.Pod
rcs []*v1.ReplicationController
rss []*apps.ReplicaSet
services []*v1.Service
sss []*apps.StatefulSet
expectedList framework.NodeScoreList
name string
}{
{
pod: new(v1.Pod),
expectedList: []framework.NodeScore{
{Name: nodeMachine1Zone1, Score: framework.MaxNodeScore},
{Name: nodeMachine1Zone2, Score: framework.MaxNodeScore},
{Name: nodeMachine2Zone2, Score: framework.MaxNodeScore},
{Name: nodeMachine1Zone3, Score: framework.MaxNodeScore},
{Name: nodeMachine2Zone3, Score: framework.MaxNodeScore},
{Name: nodeMachine3Zone3, Score: framework.MaxNodeScore},
},
name: "nothing scheduled",
},
{
pod: buildPod("", labels1, nil),
pods: []*v1.Pod{buildPod(nodeMachine1Zone1, nil, nil)},
expectedList: []framework.NodeScore{
{Name: nodeMachine1Zone1, Score: framework.MaxNodeScore},
{Name: nodeMachine1Zone2, Score: framework.MaxNodeScore},
{Name: nodeMachine2Zone2, Score: framework.MaxNodeScore},
{Name: nodeMachine1Zone3, Score: framework.MaxNodeScore},
{Name: nodeMachine2Zone3, Score: framework.MaxNodeScore},
{Name: nodeMachine3Zone3, Score: framework.MaxNodeScore},
},
name: "no services",
},
{
pod: buildPod("", labels1, nil),
pods: []*v1.Pod{buildPod(nodeMachine1Zone1, labels2, nil)},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
expectedList: []framework.NodeScore{
{Name: nodeMachine1Zone1, Score: framework.MaxNodeScore},
{Name: nodeMachine1Zone2, Score: framework.MaxNodeScore},
{Name: nodeMachine2Zone2, Score: framework.MaxNodeScore},
{Name: nodeMachine1Zone3, Score: framework.MaxNodeScore},
{Name: nodeMachine2Zone3, Score: framework.MaxNodeScore},
{Name: nodeMachine3Zone3, Score: framework.MaxNodeScore},
},
name: "different services",
},
{
pod: buildPod("", labels1, nil),
pods: []*v1.Pod{
buildPod(nodeMachine1Zone1, labels2, nil),
buildPod(nodeMachine1Zone2, labels2, nil),
},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}}},
expectedList: []framework.NodeScore{
{Name: nodeMachine1Zone1, Score: framework.MaxNodeScore},
{Name: nodeMachine1Zone2, Score: framework.MaxNodeScore},
{Name: nodeMachine2Zone2, Score: framework.MaxNodeScore},
{Name: nodeMachine1Zone3, Score: framework.MaxNodeScore},
{Name: nodeMachine2Zone3, Score: framework.MaxNodeScore},
{Name: nodeMachine3Zone3, Score: framework.MaxNodeScore},
},
name: "two pods, 0 matching",
},
{
pod: buildPod("", labels1, nil),
pods: []*v1.Pod{
buildPod(nodeMachine1Zone1, labels2, nil),
buildPod(nodeMachine1Zone2, labels1, nil),
},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}}},
expectedList: []framework.NodeScore{
{Name: nodeMachine1Zone1, Score: framework.MaxNodeScore},
{Name: nodeMachine1Zone2, Score: 0}, // Already have pod on machine
{Name: nodeMachine2Zone2, Score: 33}, // Already have pod in zone
{Name: nodeMachine1Zone3, Score: framework.MaxNodeScore},
{Name: nodeMachine2Zone3, Score: framework.MaxNodeScore},
{Name: nodeMachine3Zone3, Score: framework.MaxNodeScore},
},
name: "two pods, 1 matching (in z2)",
},
{
pod: buildPod("", labels1, nil),
pods: []*v1.Pod{
buildPod(nodeMachine1Zone1, labels2, nil),
buildPod(nodeMachine1Zone2, labels1, nil),
buildPod(nodeMachine2Zone2, labels1, nil),
buildPod(nodeMachine1Zone3, labels2, nil),
buildPod(nodeMachine2Zone3, labels1, nil),
},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}}},
expectedList: []framework.NodeScore{
{Name: nodeMachine1Zone1, Score: framework.MaxNodeScore},
{Name: nodeMachine1Zone2, Score: 0}, // Pod on node
{Name: nodeMachine2Zone2, Score: 0}, // Pod on node
{Name: nodeMachine1Zone3, Score: 66}, // Pod in zone
{Name: nodeMachine2Zone3, Score: 33}, // Pod on node
{Name: nodeMachine3Zone3, Score: 66}, // Pod in zone
},
name: "five pods, 3 matching (z2=2, z3=1)",
},
{
pod: buildPod("", labels1, nil),
pods: []*v1.Pod{
buildPod(nodeMachine1Zone1, labels1, nil),
buildPod(nodeMachine1Zone2, labels1, nil),
buildPod(nodeMachine2Zone2, labels2, nil),
buildPod(nodeMachine1Zone3, labels1, nil),
},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}}},
expectedList: []framework.NodeScore{
{Name: nodeMachine1Zone1, Score: 0}, // Pod on node
{Name: nodeMachine1Zone2, Score: 0}, // Pod on node
{Name: nodeMachine2Zone2, Score: 33}, // Pod in zone
{Name: nodeMachine1Zone3, Score: 0}, // Pod on node
{Name: nodeMachine2Zone3, Score: 33}, // Pod in zone
{Name: nodeMachine3Zone3, Score: 33}, // Pod in zone
},
name: "four pods, 3 matching (z1=1, z2=1, z3=1)",
},
{
pod: buildPod("", labels1, nil),
pods: []*v1.Pod{
buildPod(nodeMachine1Zone1, labels1, nil),
buildPod(nodeMachine1Zone2, labels1, nil),
buildPod(nodeMachine1Zone3, labels1, nil),
buildPod(nodeMachine2Zone2, labels2, nil),
},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}}},
expectedList: []framework.NodeScore{
{Name: nodeMachine1Zone1, Score: 0}, // Pod on node
{Name: nodeMachine1Zone2, Score: 0}, // Pod on node
{Name: nodeMachine2Zone2, Score: 33}, // Pod in zone
{Name: nodeMachine1Zone3, Score: 0}, // Pod on node
{Name: nodeMachine2Zone3, Score: 33}, // Pod in zone
{Name: nodeMachine3Zone3, Score: 33}, // Pod in zone
},
name: "four pods, 3 matching (z1=1, z2=1, z3=1)",
},
{
pod: buildPod("", labels1, controllerRef("ReplicationController", "name", "abc123")),
pods: []*v1.Pod{
buildPod(nodeMachine1Zone3, labels1, controllerRef("ReplicationController", "name", "abc123")),
buildPod(nodeMachine1Zone2, labels1, controllerRef("ReplicationController", "name", "abc123")),
buildPod(nodeMachine1Zone3, labels1, controllerRef("ReplicationController", "name", "abc123")),
},
rcs: []*v1.ReplicationController{{Spec: v1.ReplicationControllerSpec{Selector: labels1}}},
expectedList: []framework.NodeScore{
// Note that because we put two pods on the same node (nodeMachine1Zone3),
// the values here are questionable for zone2, in particular for nodeMachine1Zone2.
// However they kind of make sense; zone1 is still most-highly favored.
// zone3 is in general least favored, and m1.z3 particularly low priority.
// We would probably prefer to see a bigger gap between putting a second
// pod on m1.z2 and putting a pod on m2.z2, but the ordering is correct.
// This is also consistent with what we have already.
{Name: nodeMachine1Zone1, Score: framework.MaxNodeScore}, // No pods in zone
{Name: nodeMachine1Zone2, Score: 50}, // Pod on node
{Name: nodeMachine2Zone2, Score: 66}, // Pod in zone
{Name: nodeMachine1Zone3, Score: 0}, // Two pods on node
{Name: nodeMachine2Zone3, Score: 33}, // Pod in zone
{Name: nodeMachine3Zone3, Score: 33}, // Pod in zone
},
name: "Replication controller spreading (z1=0, z2=1, z3=2)",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodes := makeLabeledNodeList(labeledNodes)
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, nodes))
selectorSpread := SelectorSpread{
serviceLister: fakelisters.ServiceLister(test.services),
controllerLister: fakelisters.ControllerLister(test.rcs),
replicaSetLister: fakelisters.ReplicaSetLister(test.rss),
statefulSetLister: fakelisters.StatefulSetLister(test.sss),
}
metaDataProducer := NewMetadataFactory(
fakelisters.ServiceLister(test.services),
fakelisters.ControllerLister(test.rcs),
fakelisters.ReplicaSetLister(test.rss),
fakelisters.StatefulSetLister(test.sss),
)
metaData := metaDataProducer(test.pod, nodes, snapshot)
list, err := runMapReducePriority(selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce, metaData, test.pod, snapshot, makeLabeledNodeList(labeledNodes))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
// sort the two lists to avoid failures on account of different ordering
sortNodeScoreList(test.expectedList)
sortNodeScoreList(list)
if !reflect.DeepEqual(test.expectedList, list) {
t.Errorf("expected %#v, got %#v", test.expectedList, list)
}
})
}
}
func makeLabeledNodeList(nodeMap map[string]map[string]string) []*v1.Node {
nodes := make([]*v1.Node, 0, len(nodeMap))
for nodeName, labels := range nodeMap {
nodes = append(nodes, &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName, Labels: labels}})
}
return nodes
}
func makeNodeList(nodeNames []string) []*v1.Node {
nodes := make([]*v1.Node, 0, len(nodeNames))
for _, nodeName := range nodeNames {
nodes = append(nodes, &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}})
}
return nodes
}

View File

@ -1,55 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package priorities
import (
"sort"
v1 "k8s.io/api/core/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
)
func runMapReducePriority(mapFn PriorityMapFunction, reduceFn PriorityReduceFunction, metaData interface{}, pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) {
result := make(framework.NodeScoreList, 0, len(nodes))
for i := range nodes {
nodeInfo, err := sharedLister.NodeInfos().Get(nodes[i].Name)
if err != nil {
return nil, err
}
hostResult, err := mapFn(pod, metaData, nodeInfo)
if err != nil {
return nil, err
}
result = append(result, hostResult)
}
if reduceFn != nil {
if err := reduceFn(pod, metaData, sharedLister, result); err != nil {
return nil, err
}
}
return result, nil
}
func sortNodeScoreList(out framework.NodeScoreList) {
sort.Slice(out, func(i, j int) bool {
if out[i].Score == out[j].Score {
return out[i].Name < out[j].Name
}
return out[i].Score < out[j].Score
})
}

View File

@ -111,6 +111,7 @@ func getDefaultConfig(hardPodAffinityWeight int64) *Config {
PostFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: interpodaffinity.Name},
{Name: defaultpodtopologyspread.Name},
{Name: tainttoleration.Name},
},
},

View File

@ -76,6 +76,7 @@ func TestClusterAutoscalerProvider(t *testing.T) {
PostFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: interpodaffinity.Name},
{Name: defaultpodtopologyspread.Name},
{Name: tainttoleration.Name},
},
},
@ -147,6 +148,7 @@ func TestApplyFeatureGates(t *testing.T) {
PostFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: interpodaffinity.Name},
{Name: defaultpodtopologyspread.Name},
{Name: tainttoleration.Name},
},
},
@ -206,6 +208,7 @@ func TestApplyFeatureGates(t *testing.T) {
PostFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: interpodaffinity.Name},
{Name: defaultpodtopologyspread.Name},
{Name: tainttoleration.Name},
{Name: podtopologyspread.Name},
{Name: noderesources.ResourceLimitsName},

View File

@ -130,6 +130,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeLabel"},
{Name: "ServiceAffinity"},
},
"PostFilterPlugin": {{Name: "DefaultPodTopologySpread"}},
"ScorePlugin": {
{Name: "NodeResourcesLeastAllocated", Weight: 1},
{Name: "NodeLabel", Weight: 4},
@ -182,6 +183,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeLabel"},
{Name: "ServiceAffinity"},
},
"PostFilterPlugin": {{Name: "DefaultPodTopologySpread"}},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
{Name: "NodeResourcesLeastAllocated", Weight: 2},
@ -242,6 +244,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "AzureDiskLimits"},
{Name: "VolumeZone"},
},
"PostFilterPlugin": {{Name: "DefaultPodTopologySpread"}},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
{Name: "ImageLocality", Weight: 2},
@ -311,6 +314,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "DefaultPodTopologySpread"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
@ -384,6 +388,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "DefaultPodTopologySpread"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
@ -468,6 +473,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "DefaultPodTopologySpread"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
@ -563,6 +569,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "DefaultPodTopologySpread"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
@ -660,6 +667,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "DefaultPodTopologySpread"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
@ -760,6 +768,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "DefaultPodTopologySpread"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
@ -872,6 +881,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "DefaultPodTopologySpread"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
@ -987,6 +997,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "DefaultPodTopologySpread"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
@ -1102,6 +1113,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "DefaultPodTopologySpread"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
@ -1221,6 +1233,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "DefaultPodTopologySpread"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
@ -1392,6 +1405,7 @@ func TestAlgorithmProviderCompatibility(t *testing.T) {
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "DefaultPodTopologySpread"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
@ -1447,6 +1461,7 @@ func TestAlgorithmProviderCompatibility(t *testing.T) {
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "DefaultPodTopologySpread"},
{Name: "TaintToleration"},
},
"ScorePlugin": {

View File

@ -1116,14 +1116,16 @@ func TestZeroRequest(t *testing.T) {
registry := framework.Registry{}
plugins := &schedulerapi.Plugins{
Filter: &schedulerapi.PluginSet{},
Score: &schedulerapi.PluginSet{},
Filter: &schedulerapi.PluginSet{},
PostFilter: &schedulerapi.PluginSet{},
Score: &schedulerapi.PluginSet{},
}
var pluginConfigs []schedulerapi.PluginConfig
pluginRegistrations := []st.RegisterPluginFunc{
st.RegisterScorePlugin(noderesources.LeastAllocatedName, noderesources.NewLeastAllocated, 1),
st.RegisterScorePlugin(noderesources.BalancedAllocationName, noderesources.NewBalancedAllocation, 1),
st.RegisterScorePlugin(defaultpodtopologyspread.Name, defaultpodtopologyspread.New, 1),
st.RegisterPostFilterPlugin(defaultpodtopologyspread.Name, defaultpodtopologyspread.New),
}
for _, f := range pluginRegistrations {
f(&registry, plugins, pluginConfigs)
@ -1155,9 +1157,16 @@ func TestZeroRequest(t *testing.T) {
false).(*genericScheduler)
scheduler.nodeInfoSnapshot = snapshot
ctx := context.Background()
state := framework.NewCycleState()
_, filteredNodesStatuses, err := scheduler.findNodesThatFit(ctx, state, test.pod)
if err != nil {
t.Fatalf("error filtering nodes: %+v", err)
}
scheduler.framework.RunPostFilterPlugins(ctx, state, test.pod, test.nodes, filteredNodesStatuses)
list, err := scheduler.prioritizeNodes(
context.Background(),
framework.NewCycleState(),
ctx,
state,
test.pod,
metadata,
test.nodes,

View File

@ -6,27 +6,37 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpodtopologyspread",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["default_pod_topology_spread_test.go"],
srcs = [
"default_pod_topology_spread_perf_test.go",
"default_pod_topology_spread_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/listers/fake:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
],
)

View File

@ -19,50 +19,144 @@ package defaultpodtopologyspread
import (
"context"
"fmt"
"k8s.io/klog"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
appslisters "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
utilnode "k8s.io/kubernetes/pkg/util/node"
)
// DefaultPodTopologySpread is a plugin that calculates selector spread priority.
type DefaultPodTopologySpread struct {
handle framework.FrameworkHandle
calculateSpreadPriorityMap priorities.PriorityMapFunction
calculateSpreadPriorityReduce priorities.PriorityReduceFunction
handle framework.FrameworkHandle
}
var _ framework.ScorePlugin = &DefaultPodTopologySpread{}
// Name is the name of the plugin used in the plugin registry and configurations.
const Name = "DefaultPodTopologySpread"
const (
// Name is the name of the plugin used in the plugin registry and configurations.
Name = "DefaultPodTopologySpread"
// postFilterStateKey is the key in CycleState to DefaultPodTopologySpread pre-computed data for Scoring.
postFilterStateKey = "PostFilter" + Name
// When zone information is present, give 2/3 of the weighting to zone spreading, 1/3 to node spreading
// TODO: Any way to justify this weighting?
zoneWeighting float64 = 2.0 / 3.0
)
// Name returns name of the plugin. It is used in logs, etc.
func (pl *DefaultPodTopologySpread) Name() string {
return Name
}
// postFilterState computed at PostFilter and used at Score.
type postFilterState struct {
selector labels.Selector
}
// Clone implements the mandatory Clone interface. We don't really copy the data since
// there is no need for that.
func (s *postFilterState) Clone() framework.StateData {
return s
}
// Score invoked at the Score extension point.
// The "score" returned in this function is the matching number of pods on the `nodeName`,
// it is normalized later.
func (pl *DefaultPodTopologySpread) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
c, err := state.Read(postFilterStateKey)
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("Error reading %q from cycleState: %v", postFilterStateKey, err))
}
s, ok := c.(*postFilterState)
if !ok {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("%+v convert to tainttoleration.postFilterState error", c))
}
nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
}
meta := migration.PriorityMetadata(state)
s, err := pl.calculateSpreadPriorityMap(pod, meta, nodeInfo)
return s.Score, migration.ErrorToFrameworkStatus(err)
count := countMatchingPods(pod.Namespace, s.selector, nodeInfo)
return int64(count), nil
}
// NormalizeScore invoked after scoring all nodes.
// For this plugin, it calculates the source of each node
// based on the number of existing matching pods on the node
// where zone information is included on the nodes, it favors nodes
// in zones with fewer existing matching pods.
func (pl *DefaultPodTopologySpread) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
meta := migration.PriorityMetadata(state)
err := pl.calculateSpreadPriorityReduce(pod, meta, pl.handle.SnapshotSharedLister(), scores)
return migration.ErrorToFrameworkStatus(err)
countsByZone := make(map[string]int64, 10)
maxCountByZone := int64(0)
maxCountByNodeName := int64(0)
for i := range scores {
if scores[i].Score > maxCountByNodeName {
maxCountByNodeName = scores[i].Score
}
nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(scores[i].Name)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
zoneID := utilnode.GetZoneKey(nodeInfo.Node())
if zoneID == "" {
continue
}
countsByZone[zoneID] += scores[i].Score
}
for zoneID := range countsByZone {
if countsByZone[zoneID] > maxCountByZone {
maxCountByZone = countsByZone[zoneID]
}
}
haveZones := len(countsByZone) != 0
maxCountByNodeNameFloat64 := float64(maxCountByNodeName)
maxCountByZoneFloat64 := float64(maxCountByZone)
MaxNodeScoreFloat64 := float64(framework.MaxNodeScore)
for i := range scores {
// initializing to the default/max node score of maxPriority
fScore := MaxNodeScoreFloat64
if maxCountByNodeName > 0 {
fScore = MaxNodeScoreFloat64 * (float64(maxCountByNodeName-scores[i].Score) / maxCountByNodeNameFloat64)
}
// If there is zone information present, incorporate it
if haveZones {
nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(scores[i].Name)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
zoneID := utilnode.GetZoneKey(nodeInfo.Node())
if zoneID != "" {
zoneScore := MaxNodeScoreFloat64
if maxCountByZone > 0 {
zoneScore = MaxNodeScoreFloat64 * (float64(maxCountByZone-countsByZone[zoneID]) / maxCountByZoneFloat64)
}
fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore)
}
}
scores[i].Score = int64(fScore)
if klog.V(10) {
klog.Infof(
"%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, scores[i].Name, int64(fScore),
)
}
}
return nil
}
// ScoreExtensions of the Score plugin.
@ -70,19 +164,91 @@ func (pl *DefaultPodTopologySpread) ScoreExtensions() framework.ScoreExtensions
return pl
}
// New initializes a new plugin and returns it.
func New(_ *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
informerFactory := handle.SharedInformerFactory()
calculateSpreadPriorityMap, calculateSpreadPriorityReduce := priorities.NewSelectorSpreadPriority(
// PostFilter builds and writes cycle state used by Score and NormalizeScore.
func (pl *DefaultPodTopologySpread) PostFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodes []*v1.Node, _ framework.NodeToStatusMap) *framework.Status {
var selector labels.Selector
informerFactory := pl.handle.SharedInformerFactory()
selector = getSelector(
pod,
informerFactory.Core().V1().Services().Lister(),
informerFactory.Core().V1().ReplicationControllers().Lister(),
informerFactory.Apps().V1().ReplicaSets().Lister(),
informerFactory.Apps().V1().StatefulSets().Lister(),
)
state := &postFilterState{
selector: selector,
}
cycleState.Write(postFilterStateKey, state)
return nil
}
// New initializes a new plugin and returns it.
func New(_ *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
return &DefaultPodTopologySpread{
handle: handle,
calculateSpreadPriorityMap: calculateSpreadPriorityMap,
calculateSpreadPriorityReduce: calculateSpreadPriorityReduce,
handle: handle,
}, nil
}
// countMatchingPods counts pods based on namespace and matching all selectors
func countMatchingPods(namespace string, selector labels.Selector, nodeInfo *schedulernodeinfo.NodeInfo) int {
if len(nodeInfo.Pods()) == 0 || selector.Empty() {
return 0
}
count := 0
for _, pod := range nodeInfo.Pods() {
// Ignore pods being deleted for spreading purposes
// Similar to how it is done for SelectorSpreadPriority
if namespace == pod.Namespace && pod.DeletionTimestamp == nil {
if selector.Matches(labels.Set(pod.Labels)) {
count++
}
}
}
return count
}
// getSelector returns a selector for the services, RCs, RSs, and SSs matching the given pod.
func getSelector(pod *v1.Pod, sl corelisters.ServiceLister, cl corelisters.ReplicationControllerLister, rsl appslisters.ReplicaSetLister, ssl appslisters.StatefulSetLister) labels.Selector {
labelSet := make(labels.Set)
// Since services, RCs, RSs and SSs match the pod, they won't have conflicting
// labels. Merging is safe.
if services, err := schedulerlisters.GetPodServices(sl, pod); err == nil {
for _, service := range services {
labelSet = labels.Merge(labelSet, service.Spec.Selector)
}
}
if rcs, err := cl.GetPodControllers(pod); err == nil {
for _, rc := range rcs {
labelSet = labels.Merge(labelSet, rc.Spec.Selector)
}
}
selector := labels.NewSelector()
if len(labelSet) != 0 {
selector = labelSet.AsSelector()
}
if rss, err := rsl.GetPodReplicaSets(pod); err == nil {
for _, rs := range rss {
if other, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector); err == nil {
if r, ok := other.Requirements(); ok {
selector = selector.Add(r...)
}
}
}
}
if sss, err := ssl.GetPodStatefulSets(pod); err == nil {
for _, ss := range sss {
if other, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector); err == nil {
if r, ok := other.Requirements(); ok {
selector = selector.Add(r...)
}
}
}
}
return selector
}

View File

@ -14,20 +14,20 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package priorities
package defaultpodtopologyspread
import (
"context"
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/scheduler/listers/fake"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
// The tests in this file compare the performance of SelectorSpreadPriority
// against EvenPodsSpreadPriority with a similar rule.
var (
tests = []struct {
name string
@ -53,22 +53,35 @@ func BenchmarkTestSelectorSpreadPriority(b *testing.B) {
pod := st.MakePod().Name("p").Label("foo", "").Obj()
existingPods, allNodes, filteredNodes := st.MakeNodesAndPodsForEvenPodsSpread(pod.Labels, tt.existingPodsNum, tt.allNodesNum, tt.allNodesNum)
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(existingPods, allNodes))
services := []*v1.Service{{Spec: v1.ServiceSpec{Selector: map[string]string{"foo": ""}}}}
ss := SelectorSpread{
serviceLister: fake.ServiceLister(services),
controllerLister: fake.ControllerLister(nil),
replicaSetLister: fake.ReplicaSetLister(nil),
statefulSetLister: fake.StatefulSetLister(nil),
services := &v1.ServiceList{
Items: []v1.Service{{Spec: v1.ServiceSpec{Selector: map[string]string{"foo": ""}}}},
}
client := clientsetfake.NewSimpleClientset(services)
ctx := context.Background()
informerFactory := informers.NewSharedInformerFactory(client, 0)
_ = informerFactory.Core().V1().Services().Lister()
informerFactory.Start(ctx.Done())
caches := informerFactory.WaitForCacheSync(ctx.Done())
for _, synced := range caches {
if !synced {
b.Errorf("error waiting for informer cache sync")
}
}
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot), framework.WithInformerFactory(informerFactory))
plugin := &DefaultPodTopologySpread{handle: fh}
b.ResetTimer()
for i := 0; i < b.N; i++ {
meta := &priorityMetadata{
podSelector: getSelector(pod, ss.serviceLister, ss.controllerLister, ss.replicaSetLister, ss.statefulSetLister),
state := framework.NewCycleState()
status := plugin.PostFilter(ctx, state, pod, allNodes, nil)
if !status.IsSuccess() {
b.Fatalf("unexpected error: %v", status)
}
_, err := runMapReducePriority(ss.CalculateSpreadPriorityMap, ss.CalculateSpreadPriorityReduce, meta, pod, snapshot, filteredNodes)
if err != nil {
b.Fatal(err)
for _, node := range filteredNodes {
_, status := plugin.Score(ctx, state, pod, node.Name)
if !status.IsSuccess() {
b.Errorf("unexpected error: %v", status)
}
}
}
})

View File

@ -18,6 +18,7 @@ package defaultpodtopologyspread
import (
"context"
"fmt"
"reflect"
"sort"
"testing"
@ -25,10 +26,10 @@ import (
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
@ -343,42 +344,37 @@ func TestDefaultPodTopologySpreadScore(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
nodes := makeNodeList(test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, nodes))
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
mapFunction, reduceFunction := priorities.NewSelectorSpreadPriority(
fakelisters.ServiceLister(test.services),
fakelisters.ControllerLister(test.rcs),
fakelisters.ReplicaSetLister(test.rss),
fakelisters.StatefulSetLister(test.sss),
)
metaDataProducer := priorities.NewMetadataFactory(
fakelisters.ServiceLister(test.services),
fakelisters.ControllerLister(test.rcs),
fakelisters.ReplicaSetLister(test.rss),
fakelisters.StatefulSetLister(test.sss),
)
metaData := metaDataProducer(test.pod, nodes, snapshot)
ctx := context.Background()
informerFactory, err := populateAndStartInformers(ctx, test.rcs, test.rss, test.services, test.sss)
if err != nil {
t.Errorf("error creating informerFactory: %+v", err)
}
fh, err := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot), framework.WithInformerFactory(informerFactory))
if err != nil {
t.Errorf("error creating new framework handle: %+v", err)
}
state := framework.NewCycleState()
state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: metaData})
plugin := &DefaultPodTopologySpread{
handle: fh,
calculateSpreadPriorityMap: mapFunction,
calculateSpreadPriorityReduce: reduceFunction,
handle: fh,
}
status := plugin.PostFilter(ctx, state, test.pod, nodes, nil)
if !status.IsSuccess() {
t.Fatalf("unexpected error: %v", status)
}
var gotList framework.NodeScoreList
for _, nodeName := range test.nodes {
score, status := plugin.Score(context.Background(), state, test.pod, nodeName)
score, status := plugin.Score(ctx, state, test.pod, nodeName)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score})
}
status := plugin.ScoreExtensions().NormalizeScore(context.Background(), state, test.pod, gotList)
status = plugin.ScoreExtensions().NormalizeScore(ctx, state, test.pod, gotList)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
@ -601,41 +597,37 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
nodes := makeLabeledNodeList(labeledNodes)
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, nodes))
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
mapFunction, reduceFunction := priorities.NewSelectorSpreadPriority(
fakelisters.ServiceLister(test.services),
fakelisters.ControllerLister(test.rcs),
fakelisters.ReplicaSetLister(test.rss),
fakelisters.StatefulSetLister(test.sss),
)
metaDataProducer := priorities.NewMetadataFactory(
fakelisters.ServiceLister(test.services),
fakelisters.ControllerLister(test.rcs),
fakelisters.ReplicaSetLister(test.rss),
fakelisters.StatefulSetLister(test.sss),
)
metaData := metaDataProducer(test.pod, nodes, snapshot)
ctx := context.Background()
informerFactory, err := populateAndStartInformers(ctx, test.rcs, test.rss, test.services, test.sss)
if err != nil {
t.Errorf("error creating informerFactory: %+v", err)
}
fh, err := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot), framework.WithInformerFactory(informerFactory))
if err != nil {
t.Errorf("error creating new framework handle: %+v", err)
}
plugin := &DefaultPodTopologySpread{
handle: fh,
calculateSpreadPriorityMap: mapFunction,
calculateSpreadPriorityReduce: reduceFunction,
handle: fh,
}
state := framework.NewCycleState()
state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: metaData})
status := plugin.PostFilter(ctx, state, test.pod, nodes, nil)
if !status.IsSuccess() {
t.Fatalf("unexpected error: %v", status)
}
var gotList framework.NodeScoreList
for _, n := range nodes {
nodeName := n.ObjectMeta.Name
score, status := plugin.Score(context.Background(), state, test.pod, nodeName)
score, status := plugin.Score(ctx, state, test.pod, nodeName)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score})
}
status := plugin.ScoreExtensions().NormalizeScore(context.Background(), state, test.pod, gotList)
status = plugin.ScoreExtensions().NormalizeScore(ctx, state, test.pod, gotList)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
@ -649,6 +641,38 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
}
}
func populateAndStartInformers(ctx context.Context, rcs []*v1.ReplicationController, rss []*apps.ReplicaSet, services []*v1.Service, sss []*apps.StatefulSet) (informers.SharedInformerFactory, error) {
objects := make([]runtime.Object, 0, len(rcs)+len(rss)+len(services)+len(sss))
for _, rc := range rcs {
objects = append(objects, rc.DeepCopyObject())
}
for _, rs := range rss {
objects = append(objects, rs.DeepCopyObject())
}
for _, service := range services {
objects = append(objects, service.DeepCopyObject())
}
for _, ss := range sss {
objects = append(objects, ss.DeepCopyObject())
}
client := clientsetfake.NewSimpleClientset(objects...)
informerFactory := informers.NewSharedInformerFactory(client, 0)
// Because we use an informer factory, we need to make requests for the specific informers we want before calling Start()
_ = informerFactory.Core().V1().Services().Lister()
_ = informerFactory.Core().V1().ReplicationControllers().Lister()
_ = informerFactory.Apps().V1().ReplicaSets().Lister()
_ = informerFactory.Apps().V1().StatefulSets().Lister()
informerFactory.Start(ctx.Done())
caches := informerFactory.WaitForCacheSync(ctx.Done())
for _, synced := range caches {
if !synced {
return nil, fmt.Errorf("error waiting for informer cache sync")
}
}
return informerFactory, nil
}
func makeLabeledNodeList(nodeMap map[string]map[string]string) []*v1.Node {
nodes := make([]*v1.Node, 0, len(nodeMap))
for nodeName, labels := range nodeMap {

View File

@ -232,6 +232,7 @@ func NewLegacyRegistry() *LegacyRegistry {
registry.registerPriorityConfigProducer(priorities.SelectorSpreadPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, defaultpodtopologyspread.Name, &args.Weight)
plugins.PostFilter = appendToPluginSet(plugins.PostFilter, defaultpodtopologyspread.Name, nil)
return
})
registry.registerPriorityConfigProducer(priorities.TaintTolerationPriority,

View File

@ -35,6 +35,11 @@ func RegisterScorePlugin(pluginName string, pluginNewFunc framework.PluginFactor
return RegisterPluginAsExtensions(pluginName, weight, pluginNewFunc, "Score")
}
// RegisterPostFilterPlugin returns a function to register a Score Plugin to a given registry.
func RegisterPostFilterPlugin(pluginName string, pluginNewFunc framework.PluginFactory) RegisterPluginFunc {
return RegisterPluginAsExtensions(pluginName, 1, pluginNewFunc, "PostFilter")
}
// RegisterPluginAsExtensions returns a function to register a Plugin as given extensionPoints to a given registry.
func RegisterPluginAsExtensions(pluginName string, weight int32, pluginNewFunc framework.PluginFactory, extensions ...string) RegisterPluginFunc {
return func(reg *framework.Registry, plugins *schedulerapi.Plugins, pluginConfigs []schedulerapi.PluginConfig) {

View File

@ -122,6 +122,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "DefaultPodTopologySpread"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
@ -201,6 +202,7 @@ kind: Policy
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "DefaultPodTopologySpread"},
{Name: "TaintToleration"},
},
"ScorePlugin": {