feat: implement node affinity priority as score plugin

+ Add DefaultNormalizeScore function
+ Implement NodeAffinity as score plugin
This commit is contained in:
draveness 2019-12-24 17:07:56 +08:00
parent bb7bad49f5
commit 75872b8e79
12 changed files with 201 additions and 273 deletions

View File

@ -14,7 +14,6 @@ go_library(
"least_requested.go",
"metadata.go",
"most_requested.go",
"node_affinity.go",
"node_prefer_avoid_pods.go",
"priorities.go",
"reduce.go",
@ -55,7 +54,6 @@ go_test(
"least_requested_test.go",
"metadata_test.go",
"most_requested_test.go",
"node_affinity_test.go",
"node_prefer_avoid_pods_test.go",
"requested_to_capacity_ratio_test.go",
"resource_limits_test.go",

View File

@ -57,7 +57,6 @@ func NewMetadataFactory(
// priorityMetadata is a type that is passed as metadata for priority functions
type priorityMetadata struct {
podLimits *schedulernodeinfo.Resource
affinity *v1.Affinity
podSelector labels.Selector
controllerRef *metav1.OwnerReference
podFirstServiceSelector labels.Selector
@ -87,7 +86,6 @@ func (pmf *MetadataFactory) PriorityMetadata(
}
return &priorityMetadata{
podLimits: getResourceLimits(pod),
affinity: pod.Spec.Affinity,
podSelector: getSelector(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister),
controllerRef: metav1.GetControllerOf(pod),
podFirstServiceSelector: getFirstServiceSelector(pod, pmf.serviceLister),

View File

@ -139,7 +139,6 @@ func TestPriorityMetadata(t *testing.T) {
pod: podWithTolerationsAndAffinity,
expected: &priorityMetadata{
podLimits: nonPodLimits,
affinity: podAffinity,
podSelector: labels.NewSelector(),
},
name: "Produce a priorityMetadata with default requests",
@ -148,7 +147,6 @@ func TestPriorityMetadata(t *testing.T) {
pod: podWithTolerationsAndRequests,
expected: &priorityMetadata{
podLimits: nonPodLimits,
affinity: nil,
podSelector: labels.NewSelector(),
},
name: "Produce a priorityMetadata with tolerations and requests",
@ -157,7 +155,6 @@ func TestPriorityMetadata(t *testing.T) {
pod: podWithAffinityAndRequests,
expected: &priorityMetadata{
podLimits: specifiedPodLimits,
affinity: podAffinity,
podSelector: labels.NewSelector(),
},
name: "Produce a priorityMetadata with affinity and requests",

View File

@ -1,77 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package priorities
import (
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
// CalculateNodeAffinityPriorityMap prioritizes nodes according to node affinity scheduling preferences
// indicated in PreferredDuringSchedulingIgnoredDuringExecution. Each time a node matches a preferredSchedulingTerm,
// it will get an add of preferredSchedulingTerm.Weight. Thus, the more preferredSchedulingTerms
// the node satisfies and the more the preferredSchedulingTerm that is satisfied weights, the higher
// score the node gets.
func CalculateNodeAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
node := nodeInfo.Node()
if node == nil {
return framework.NodeScore{}, fmt.Errorf("node not found")
}
// default is the podspec.
affinity := pod.Spec.Affinity
if priorityMeta, ok := meta.(*priorityMetadata); ok {
// We were able to parse metadata, use affinity from there.
affinity = priorityMeta.affinity
}
var count int32
// A nil element of PreferredDuringSchedulingIgnoredDuringExecution matches no objects.
// An element of PreferredDuringSchedulingIgnoredDuringExecution that refers to an
// empty PreferredSchedulingTerm matches all objects.
if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {
// Match PreferredDuringSchedulingIgnoredDuringExecution term by term.
for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]
if preferredSchedulingTerm.Weight == 0 {
continue
}
// TODO: Avoid computing it for all nodes if this becomes a performance problem.
nodeSelector, err := v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)
if err != nil {
return framework.NodeScore{}, err
}
if nodeSelector.Matches(labels.Set(node.Labels)) {
count += preferredSchedulingTerm.Weight
}
}
}
return framework.NodeScore{
Name: node.Name,
Score: int64(count),
}, nil
}
// CalculateNodeAffinityPriorityReduce is a reduce function for node affinity priority calculation.
var CalculateNodeAffinityPriorityReduce = NormalizeReduce(framework.MaxNodeScore, false)

View File

@ -1,180 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package priorities
import (
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
func TestNodeAffinityPriority(t *testing.T) {
label1 := map[string]string{"foo": "bar"}
label2 := map[string]string{"key": "value"}
label3 := map[string]string{"az": "az1"}
label4 := map[string]string{"abc": "az11", "def": "az22"}
label5 := map[string]string{"foo": "bar", "key": "value", "az": "az1"}
affinity1 := &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{{
Weight: 2,
Preference: v1.NodeSelectorTerm{
MatchExpressions: []v1.NodeSelectorRequirement{{
Key: "foo",
Operator: v1.NodeSelectorOpIn,
Values: []string{"bar"},
}},
},
}},
},
}
affinity2 := &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{
{
Weight: 2,
Preference: v1.NodeSelectorTerm{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "foo",
Operator: v1.NodeSelectorOpIn,
Values: []string{"bar"},
},
},
},
},
{
Weight: 4,
Preference: v1.NodeSelectorTerm{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "key",
Operator: v1.NodeSelectorOpIn,
Values: []string{"value"},
},
},
},
},
{
Weight: 5,
Preference: v1.NodeSelectorTerm{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "foo",
Operator: v1.NodeSelectorOpIn,
Values: []string{"bar"},
},
{
Key: "key",
Operator: v1.NodeSelectorOpIn,
Values: []string{"value"},
},
{
Key: "az",
Operator: v1.NodeSelectorOpIn,
Values: []string{"az1"},
},
},
},
},
},
},
}
tests := []struct {
pod *v1.Pod
nodes []*v1.Node
expectedList framework.NodeScoreList
name string
}{
{
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{},
},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: label1}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: label2}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: label3}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: 0}},
name: "all machines are same priority as NodeAffinity is nil",
},
{
pod: &v1.Pod{
Spec: v1.PodSpec{
Affinity: affinity1,
},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: label4}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: label2}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: label3}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: 0}},
name: "no machine macthes preferred scheduling requirements in NodeAffinity of pod so all machines' priority is zero",
},
{
pod: &v1.Pod{
Spec: v1.PodSpec{
Affinity: affinity1,
},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: label1}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: label2}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: label3}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: 0}},
name: "only machine1 matches the preferred scheduling requirements of pod",
},
{
pod: &v1.Pod{
Spec: v1.PodSpec{
Affinity: affinity2,
},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: label1}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine5", Labels: label5}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: label2}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 18}, {Name: "machine5", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 36}},
name: "all machines matches the preferred scheduling requirements of pod but with different priorities ",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(nil, test.nodes))
list, err := runMapReducePriority(CalculateNodeAffinityPriorityMap, CalculateNodeAffinityPriorityReduce, nil, test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(test.expectedList, list) {
t.Errorf("expected %#v, \ngot %#v", test.expectedList, list)
}
})
}
}

View File

@ -83,7 +83,7 @@ func init() {
scheduler.RegisterPriorityMapReduceFunction(priorities.NodePreferAvoidPodsPriority, priorities.CalculateNodePreferAvoidPodsPriorityMap, nil, 10000)
// Prioritizes nodes that have labels matching NodeAffinity
scheduler.RegisterPriorityMapReduceFunction(priorities.NodeAffinityPriority, priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1)
scheduler.RegisterPriorityMapReduceFunction(priorities.NodeAffinityPriority, nil, nil, 1)
// Prioritizes nodes that marked with taint which pod can tolerate.
scheduler.RegisterPriorityMapReduceFunction(priorities.TaintTolerationPriority, nil, nil, 1)

View File

@ -47,6 +47,7 @@ filegroup(
":package-srcs",
"//pkg/scheduler/framework/plugins/defaultpodtopologyspread:all-srcs",
"//pkg/scheduler/framework/plugins/examples:all-srcs",
"//pkg/scheduler/framework/plugins/helper:all-srcs",
"//pkg/scheduler/framework/plugins/imagelocality:all-srcs",
"//pkg/scheduler/framework/plugins/interpodaffinity:all-srcs",
"//pkg/scheduler/framework/plugins/migration:all-srcs",

View File

@ -0,0 +1,30 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["normalize_score.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper",
visibility = ["//visibility:public"],
deps = ["//pkg/scheduler/framework/v1alpha1:go_default_library"],
)
go_test(
name = "go_default_test",
srcs = ["normalize_score_test.go"],
embed = [":go_default_library"],
deps = ["//pkg/scheduler/framework/v1alpha1:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,54 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package helper
import (
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
// DefaultNormalizeScore generates a Normalize Score function that can normalize the
// scores to [0, maxPriority]. If reverse is set to true, it reverses the scores by
// subtracting it from maxPriority.
func DefaultNormalizeScore(maxPriority int64, reverse bool, scores framework.NodeScoreList) *framework.Status {
var maxCount int64
for i := range scores {
if scores[i].Score > maxCount {
maxCount = scores[i].Score
}
}
if maxCount == 0 {
if reverse {
for i := range scores {
scores[i].Score = maxPriority
}
}
return nil
}
for i := range scores {
score := scores[i].Score
score = maxPriority * score / maxCount
if reverse {
score = maxPriority - score
}
scores[i].Score = score
}
return nil
}

View File

@ -0,0 +1,76 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package helper
import (
"reflect"
"testing"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
func TestDefaultNormalizeScore(t *testing.T) {
tests := []struct {
reverse bool
scores []int64
expectedScores []int64
}{
{
scores: []int64{1, 2, 3, 4},
expectedScores: []int64{25, 50, 75, 100},
},
{
reverse: true,
scores: []int64{1, 2, 3, 4},
expectedScores: []int64{75, 50, 25, 0},
},
{
scores: []int64{1000, 10, 20, 30},
expectedScores: []int64{100, 1, 2, 3},
},
{
reverse: true,
scores: []int64{1000, 10, 20, 30},
expectedScores: []int64{0, 99, 98, 97},
},
{
scores: []int64{1, 1, 1, 1},
expectedScores: []int64{100, 100, 100, 100},
},
{
scores: []int64{1000, 1, 1, 1},
expectedScores: []int64{100, 0, 0, 0},
},
}
for i, test := range tests {
scores := framework.NodeScoreList{}
for _, score := range test.scores {
scores = append(scores, framework.NodeScore{Score: score})
}
expectedScores := framework.NodeScoreList{}
for _, score := range test.expectedScores {
expectedScores = append(expectedScores, framework.NodeScore{Score: score})
}
DefaultNormalizeScore(framework.MaxNodeScore, test.reverse, scores)
if !reflect.DeepEqual(scores, expectedScores) {
t.Errorf("test %d, expected %v, got %v", i, expectedScores, scores)
}
}
}

View File

@ -6,12 +6,14 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity",
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/framework/plugins/helper:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
],
)

View File

@ -21,9 +21,11 @@ import (
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
@ -58,16 +60,43 @@ func (pl *NodeAffinity) Score(ctx context.Context, state *framework.CycleState,
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
}
meta := migration.PriorityMetadata(state)
s, err := priorities.CalculateNodeAffinityPriorityMap(pod, meta, nodeInfo)
return s.Score, migration.ErrorToFrameworkStatus(err)
node := nodeInfo.Node()
if node == nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
}
affinity := pod.Spec.Affinity
var count int64
// A nil element of PreferredDuringSchedulingIgnoredDuringExecution matches no objects.
// An element of PreferredDuringSchedulingIgnoredDuringExecution that refers to an
// empty PreferredSchedulingTerm matches all objects.
if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {
// Match PreferredDuringSchedulingIgnoredDuringExecution term by term.
for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]
if preferredSchedulingTerm.Weight == 0 {
continue
}
// TODO: Avoid computing it for all nodes if this becomes a performance problem.
nodeSelector, err := v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)
if err != nil {
return 0, framework.NewStatus(framework.Error, err.Error())
}
if nodeSelector.Matches(labels.Set(node.Labels)) {
count += int64(preferredSchedulingTerm.Weight)
}
}
}
return count, nil
}
// NormalizeScore invoked after scoring all nodes.
func (pl *NodeAffinity) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
// Note that CalculateNodeAffinityPriorityReduce doesn't use priority metadata, hence passing nil here.
err := priorities.CalculateNodeAffinityPriorityReduce(pod, nil, pl.handle.SnapshotSharedLister(), scores)
return migration.ErrorToFrameworkStatus(err)
return pluginhelper.DefaultNormalizeScore(framework.MaxNodeScore, false, scores)
}
// ScoreExtensions of the Score plugin.