Added the first predicate as a filter plugin: PodToleratesNodeTaints.

This commit is contained in:
Abdullah Gharaibeh 2019-10-04 17:40:21 -04:00
parent 4e1214c149
commit 66386fdf49
17 changed files with 461 additions and 160 deletions

View File

@ -82,6 +82,38 @@ var (
ErrFakePredicate = newPredicateFailureError("FakePredicateError", "Nodes failed the fake predicate")
)
var unresolvablePredicateFailureErrors = map[PredicateFailureReason]struct{}{
ErrNodeSelectorNotMatch: {},
ErrPodAffinityRulesNotMatch: {},
ErrPodNotMatchHostName: {},
ErrTaintsTolerationsNotMatch: {},
ErrNodeLabelPresenceViolated: {},
// Node conditions won't change when scheduler simulates removal of preemption victims.
// So, it is pointless to try nodes that have not been able to host the pod due to node
// conditions. These include ErrNodeNotReady, ErrNodeUnderPIDPressure, ErrNodeUnderMemoryPressure, ....
ErrNodeNotReady: {},
ErrNodeNetworkUnavailable: {},
ErrNodeUnderDiskPressure: {},
ErrNodeUnderPIDPressure: {},
ErrNodeUnderMemoryPressure: {},
ErrNodeUnschedulable: {},
ErrNodeUnknownCondition: {},
ErrVolumeZoneConflict: {},
ErrVolumeNodeConflict: {},
ErrVolumeBindConflict: {},
}
// UnresolvablePredicateExists checks if there is at least one unresolvable predicate failure reason, if true
// returns the first one in the list.
func UnresolvablePredicateExists(reasons []PredicateFailureReason) PredicateFailureReason {
for _, r := range reasons {
if _, ok := unresolvablePredicateFailureErrors[r]; ok {
return r
}
}
return nil
}
// InsufficientResourceError is an error type that indicates what kind of resource limit is
// hit and caused the unfitting failure.
type InsufficientResourceError struct {

View File

@ -37,10 +37,11 @@ import (
func TestCompatibility_v1_Scheduler(t *testing.T) {
// Add serialized versions of scheduler config that exercise available options to ensure compatibility between releases
schedulerFiles := map[string]struct {
JSON string
wantPredicates sets.String
wantPrioritizers sets.String
wantExtenders []schedulerapi.ExtenderConfig
JSON string
wantPredicates sets.String
wantPrioritizers sets.String
wantFilterPlugins sets.String
wantExtenders []schedulerapi.ExtenderConfig
}{
// Do not change this JSON after the corresponding release has been tagged.
// A failure indicates backwards compatibility with the specified release was broken.
@ -214,7 +215,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"HostName",
"NoDiskConflict",
"NoVolumeZoneConflict",
"PodToleratesNodeTaints",
"CheckNodeMemoryPressure",
"MaxEBSVolumeCount",
"MaxGCEPDVolumeCount",
@ -234,6 +234,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"TaintTolerationPriority",
"InterPodAffinityPriority",
),
wantFilterPlugins: sets.NewString(
"TaintToleration",
),
},
// Do not change this JSON after the corresponding release has been tagged.
@ -279,7 +282,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"HostName",
"NoDiskConflict",
"NoVolumeZoneConflict",
"PodToleratesNodeTaints",
"CheckNodeMemoryPressure",
"CheckNodeDiskPressure",
"MaxEBSVolumeCount",
@ -302,6 +304,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"InterPodAffinityPriority",
"MostRequestedPriority",
),
wantFilterPlugins: sets.NewString(
"TaintToleration",
),
},
// Do not change this JSON after the corresponding release has been tagged.
// A failure indicates backwards compatibility with the specified release was broken.
@ -356,7 +361,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"HostName",
"NoDiskConflict",
"NoVolumeZoneConflict",
"PodToleratesNodeTaints",
"CheckNodeMemoryPressure",
"CheckNodeDiskPressure",
"MaxEBSVolumeCount",
@ -379,6 +383,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"InterPodAffinityPriority",
"MostRequestedPriority",
),
wantFilterPlugins: sets.NewString(
"TaintToleration",
),
wantExtenders: []schedulerapi.ExtenderConfig{{
URLPrefix: "/prefix",
FilterVerb: "filter",
@ -445,7 +452,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"HostName",
"NoDiskConflict",
"NoVolumeZoneConflict",
"PodToleratesNodeTaints",
"CheckNodeMemoryPressure",
"CheckNodeDiskPressure",
"CheckNodeCondition",
@ -469,6 +475,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"InterPodAffinityPriority",
"MostRequestedPriority",
),
wantFilterPlugins: sets.NewString(
"TaintToleration",
),
wantExtenders: []schedulerapi.ExtenderConfig{{
URLPrefix: "/prefix",
FilterVerb: "filter",
@ -536,7 +545,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"HostName",
"NoDiskConflict",
"NoVolumeZoneConflict",
"PodToleratesNodeTaints",
"CheckNodeMemoryPressure",
"CheckNodeDiskPressure",
"CheckNodeCondition",
@ -561,6 +569,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"InterPodAffinityPriority",
"MostRequestedPriority",
),
wantFilterPlugins: sets.NewString(
"TaintToleration",
),
wantExtenders: []schedulerapi.ExtenderConfig{{
URLPrefix: "/prefix",
FilterVerb: "filter",
@ -632,7 +643,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"HostName",
"NoDiskConflict",
"NoVolumeZoneConflict",
"PodToleratesNodeTaints",
"CheckNodeMemoryPressure",
"CheckNodeDiskPressure",
"CheckNodePIDPressure",
@ -658,6 +668,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"InterPodAffinityPriority",
"MostRequestedPriority",
),
wantFilterPlugins: sets.NewString(
"TaintToleration",
),
wantExtenders: []schedulerapi.ExtenderConfig{{
URLPrefix: "/prefix",
FilterVerb: "filter",
@ -741,7 +754,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"HostName",
"NoDiskConflict",
"NoVolumeZoneConflict",
"PodToleratesNodeTaints",
"CheckNodeMemoryPressure",
"CheckNodeDiskPressure",
"CheckNodePIDPressure",
@ -768,6 +780,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"MostRequestedPriority",
"RequestedToCapacityRatioPriority",
),
wantFilterPlugins: sets.NewString(
"TaintToleration",
),
wantExtenders: []schedulerapi.ExtenderConfig{{
URLPrefix: "/prefix",
FilterVerb: "filter",
@ -852,7 +867,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"HostName",
"NoDiskConflict",
"NoVolumeZoneConflict",
"PodToleratesNodeTaints",
"CheckNodeMemoryPressure",
"CheckNodeDiskPressure",
"CheckNodePIDPressure",
@ -880,6 +894,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"MostRequestedPriority",
"RequestedToCapacityRatioPriority",
),
wantFilterPlugins: sets.NewString(
"TaintToleration",
),
wantExtenders: []schedulerapi.ExtenderConfig{{
URLPrefix: "/prefix",
FilterVerb: "filter",
@ -963,7 +980,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"HostName",
"NoDiskConflict",
"NoVolumeZoneConflict",
"PodToleratesNodeTaints",
"CheckNodeMemoryPressure",
"CheckNodeDiskPressure",
"CheckNodePIDPressure",
@ -992,6 +1008,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"MostRequestedPriority",
"RequestedToCapacityRatioPriority",
),
wantFilterPlugins: sets.NewString(
"TaintToleration",
),
wantExtenders: []schedulerapi.ExtenderConfig{{
URLPrefix: "/prefix",
FilterVerb: "filter",
@ -1079,7 +1098,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"HostName",
"NoDiskConflict",
"NoVolumeZoneConflict",
"PodToleratesNodeTaints",
"CheckNodeMemoryPressure",
"CheckNodeDiskPressure",
"CheckNodePIDPressure",
@ -1108,6 +1126,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"MostRequestedPriority",
"RequestedToCapacityRatioPriority",
),
wantFilterPlugins: sets.NewString(
"TaintToleration",
),
wantExtenders: []schedulerapi.ExtenderConfig{{
URLPrefix: "/prefix",
FilterVerb: "filter",
@ -1128,6 +1149,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
seenPredicates := sets.NewString()
seenPriorities := sets.NewString()
mandatoryPredicates := sets.NewString("CheckNodeCondition")
filterToPredicateMap := map[string]string{
"TaintToleration": "PodToleratesNodeTaints",
}
for v, tc := range schedulerFiles {
t.Run(v, func(t *testing.T) {
@ -1163,26 +1187,39 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
algorithmSrc,
make(chan struct{}),
)
if err != nil {
t.Fatalf("%s: Error constructing: %v", v, err)
}
schedPredicates := sets.NewString()
gotPredicates := sets.NewString()
for p := range sched.Algorithm.Predicates() {
schedPredicates.Insert(p)
gotPredicates.Insert(p)
}
wantPredicates := tc.wantPredicates.Union(mandatoryPredicates)
if !schedPredicates.Equal(wantPredicates) {
t.Errorf("Got predicates %v, want %v", schedPredicates, wantPredicates)
}
schedPrioritizers := sets.NewString()
for _, p := range sched.Algorithm.Prioritizers() {
schedPrioritizers.Insert(p.Name)
if !gotPredicates.Equal(wantPredicates) {
t.Errorf("Got predicates %v, want %v", gotPredicates, wantPredicates)
}
if !schedPrioritizers.Equal(tc.wantPrioritizers) {
t.Errorf("Got prioritizers %v, want %v", schedPrioritizers, tc.wantPrioritizers)
gotPrioritizers := sets.NewString()
for _, p := range sched.Algorithm.Prioritizers() {
gotPrioritizers.Insert(p.Name)
}
schedExtenders := sched.Algorithm.Extenders()
if !gotPrioritizers.Equal(tc.wantPrioritizers) {
t.Errorf("Got prioritizers %v, want %v", gotPrioritizers, tc.wantPrioritizers)
}
gotFilterPlugins := sets.NewString()
plugins := sched.Framework.ListPlugins()
for _, p := range plugins["FilterPlugin"] {
gotFilterPlugins.Insert(p)
seenPredicates.Insert(filterToPredicateMap[p])
}
if !gotFilterPlugins.Equal(tc.wantFilterPlugins) {
t.Errorf("Got filter plugins %v, want %v", gotFilterPlugins, tc.wantFilterPlugins)
}
gotExtenders := sched.Algorithm.Extenders()
var wantExtenders []*core.HTTPExtender
for _, e := range tc.wantExtenders {
extender, err := core.NewHTTPExtender(&e)
@ -1191,13 +1228,14 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}
wantExtenders = append(wantExtenders, extender.(*core.HTTPExtender))
}
for i := range schedExtenders {
if !core.Equal(wantExtenders[i], schedExtenders[i].(*core.HTTPExtender)) {
t.Errorf("Got extender #%d %+v, want %+v", i, schedExtenders[i], wantExtenders[i])
for i := range gotExtenders {
if !core.Equal(wantExtenders[i], gotExtenders[i].(*core.HTTPExtender)) {
t.Errorf("Got extender #%d %+v, want %+v", i, gotExtenders[i], wantExtenders[i])
}
}
seenPredicates = seenPredicates.Union(schedPredicates)
seenPriorities = seenPriorities.Union(schedPrioritizers)
seenPredicates = seenPredicates.Union(gotPredicates)
seenPriorities = seenPriorities.Union(gotPrioritizers)
})
}

View File

@ -65,27 +65,6 @@ const (
minFeasibleNodesPercentageToFind = 5
)
var unresolvablePredicateFailureErrors = map[predicates.PredicateFailureReason]struct{}{
predicates.ErrNodeSelectorNotMatch: {},
predicates.ErrPodAffinityRulesNotMatch: {},
predicates.ErrPodNotMatchHostName: {},
predicates.ErrTaintsTolerationsNotMatch: {},
predicates.ErrNodeLabelPresenceViolated: {},
// Node conditions won't change when scheduler simulates removal of preemption victims.
// So, it is pointless to try nodes that have not been able to host the pod due to node
// conditions. These include ErrNodeNotReady, ErrNodeUnderPIDPressure, ErrNodeUnderMemoryPressure, ....
predicates.ErrNodeNotReady: {},
predicates.ErrNodeNetworkUnavailable: {},
predicates.ErrNodeUnderDiskPressure: {},
predicates.ErrNodeUnderPIDPressure: {},
predicates.ErrNodeUnderMemoryPressure: {},
predicates.ErrNodeUnschedulable: {},
predicates.ErrNodeUnknownCondition: {},
predicates.ErrVolumeZoneConflict: {},
predicates.ErrVolumeNodeConflict: {},
predicates.ErrVolumeBindConflict: {},
}
// FailedPredicateMap declares a map[string][]algorithm.PredicateFailureReason type.
type FailedPredicateMap map[string][]predicates.PredicateFailureReason
@ -1204,16 +1183,6 @@ func (g *genericScheduler) selectVictimsOnNode(
return victims, numViolatingVictim, true
}
// unresolvablePredicateExists checks whether failedPredicates has unresolvable predicate.
func unresolvablePredicateExists(failedPredicates []predicates.PredicateFailureReason) bool {
for _, failedPredicate := range failedPredicates {
if _, ok := unresolvablePredicateFailureErrors[failedPredicate]; ok {
return true
}
}
return false
}
// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates
// that may be satisfied by removing pods from the node.
func nodesWherePreemptionMightHelp(nodes []*v1.Node, fitErr *FitError) []*v1.Node {
@ -1228,7 +1197,7 @@ func nodesWherePreemptionMightHelp(nodes []*v1.Node, fitErr *FitError) []*v1.Nod
// to rely less on such assumptions in the code when checking does not impose
// significant overhead.
// Also, we currently assume all failures returned by extender as resolvable.
if !unresolvablePredicateExists(failedPredicates) {
if predicates.UnresolvablePredicateExists(failedPredicates) == nil {
klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name)
potentialNodes = append(potentialNodes, node)
}

View File

@ -6,8 +6,9 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/plugins/noop:go_default_library",
"//pkg/scheduler/framework/plugins/tainttoleration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
],
)
@ -24,7 +25,9 @@ filegroup(
srcs = [
":package-srcs",
"//pkg/scheduler/framework/plugins/examples:all-srcs",
"//pkg/scheduler/framework/plugins/migration:all-srcs",
"//pkg/scheduler/framework/plugins/noop:all-srcs",
"//pkg/scheduler/framework/plugins/tainttoleration:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],

View File

@ -19,8 +19,9 @@ package plugins
import (
"fmt"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
noop "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noop"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
@ -29,9 +30,7 @@ import (
// runs custom plugins, can pass a different Registry when initializing the scheduler.
func NewDefaultRegistry() framework.Registry {
return framework.Registry{
// This is just a test plugin to showcase the setup, it should be deleted once
// we have at least one legitimate plugin here.
noop.Name: noop.New,
tainttoleration.Name: tainttoleration.New,
}
}
@ -55,10 +54,17 @@ type ConfigProducerRegistry struct {
// NewDefaultConfigProducerRegistry creates a new producer registry.
func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry {
return &ConfigProducerRegistry{
registry := &ConfigProducerRegistry{
PredicateToConfigProducer: make(map[string]ConfigProducer),
PriorityToConfigProducer: make(map[string]ConfigProducer),
}
registry.RegisterPredicate(predicates.PodToleratesNodeTaintsPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, tainttoleration.Name, nil)
return
})
return registry
}
func registerProducer(name string, producer ConfigProducer, producersMap map[string]ConfigProducer) error {
@ -78,3 +84,15 @@ func (f *ConfigProducerRegistry) RegisterPredicate(name string, producer ConfigP
func (f *ConfigProducerRegistry) RegisterPriority(name string, producer ConfigProducer) error {
return registerProducer(name, producer, f.PriorityToConfigProducer)
}
func appendToPluginSet(set *config.PluginSet, name string, weight *int32) *config.PluginSet {
if set == nil {
set = &config.PluginSet{}
}
cfg := config.Plugin{Name: name}
if weight != nil {
cfg.Weight = *weight
}
set.Enabled = append(set.Enabled, cfg)
return set
}

View File

@ -25,18 +25,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/apis/config"
)
func appendToPluginSet(pluginSet *config.PluginSet, name string, weight *int32) *config.PluginSet {
if pluginSet == nil {
pluginSet = &config.PluginSet{}
}
config := config.Plugin{Name: name}
if weight != nil {
config.Weight = *weight
}
pluginSet.Enabled = append(pluginSet.Enabled, config)
return pluginSet
}
func produceConfig(keys []string, producersMap map[string]ConfigProducer, args ConfigProducerArgs) (*config.Plugins, []config.PluginConfig, error) {
var plugins config.Plugins
var pluginConfig []config.PluginConfig

View File

@ -0,0 +1,36 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["utils.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = ["utils_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
],
)

View File

@ -0,0 +1,49 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package migration
import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
// PredicateResultToFrameworkStatus converts a predicate result (PredicateFailureReason + error)
// to a framework status.
func PredicateResultToFrameworkStatus(reasons []predicates.PredicateFailureReason, err error) *framework.Status {
if s := ErrorToFrameworkStatus(err); s != nil {
return s
}
if len(reasons) == 0 {
return nil
}
if r := predicates.UnresolvablePredicateExists(reasons); r != nil {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, r.GetReason())
}
// We will just use the first reason.
return framework.NewStatus(framework.Unschedulable, reasons[0].GetReason())
}
// ErrorToFrameworkStatus converts an error to a framework status.
func ErrorToFrameworkStatus(err error) *framework.Status {
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
return nil
}

View File

@ -0,0 +1,68 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package migration
import (
"errors"
"reflect"
"testing"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
func TestPredicateResultToFrameworkStatus(t *testing.T) {
tests := []struct {
name string
err error
reasons []predicates.PredicateFailureReason
wantStatus *framework.Status
}{
{
name: "Success",
},
{
name: "Error",
err: errors.New("Failed with error"),
wantStatus: framework.NewStatus(framework.Error, "Failed with error"),
},
{
name: "Error with reason",
err: errors.New("Failed with error"),
reasons: []predicates.PredicateFailureReason{predicates.ErrDiskConflict},
wantStatus: framework.NewStatus(framework.Error, "Failed with error"),
},
{
name: "Unschedulable",
reasons: []predicates.PredicateFailureReason{predicates.ErrExistingPodsAntiAffinityRulesNotMatch},
wantStatus: framework.NewStatus(framework.Unschedulable, "node(s) didn't satisfy existing pods anti-affinity rules"),
},
{
name: "Unschedulable and Unresolvable",
reasons: []predicates.PredicateFailureReason{predicates.ErrDiskConflict, predicates.ErrNodeSelectorNotMatch},
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't match node selector"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotStatus := PredicateResultToFrameworkStatus(tt.reasons, tt.err)
if !reflect.DeepEqual(tt.wantStatus, gotStatus) {
t.Errorf("Got status %v, want %v", gotStatus, tt.wantStatus)
}
})
}
}

View File

@ -0,0 +1,30 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["taint_toleration.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,50 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tainttoleration
import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
// TaintToleration is a plugin that checks if a pod tolerates a node's taints.
type TaintToleration struct{}
var _ = framework.FilterPlugin(&TaintToleration{})
// Name is the name of the plugin used in the plugin registry and configurations.
const Name = "TaintToleration"
// Name returns name of the plugin. It is used in logs, etc.
func (pl *TaintToleration) Name() string {
return Name
}
// Filter invoked at the filter extension point.
func (pl *TaintToleration) Filter(_ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
_, reasons, err := predicates.PodToleratesNodeTaints(pod, nil, nodeInfo)
return migration.PredicateResultToFrameworkStatus(reasons, err)
}
// New initializes a new plugin and returns it.
func New(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return &TaintToleration{}, nil
}

View File

@ -33,6 +33,11 @@ import (
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
)
const (
// Specifies the maximum timeout a permit plugin can return.
maxTimeout time.Duration = 15 * time.Minute
)
// framework is the component responsible for initializing and running scheduler
// plugins.
type framework struct {
@ -53,10 +58,32 @@ type framework struct {
permitPlugins []PermitPlugin
}
const (
// Specifies the maximum timeout a permit plugin can return.
maxTimeout time.Duration = 15 * time.Minute
)
// extensionPoint encapsulates desired and applied set of plugins at a specific extension
// point. This is used to simplify iterating over all extension points supported by the
// framework.
type extensionPoint struct {
// the set of plugins to be configured at this extension point.
plugins *config.PluginSet
// a pointer to the slice storing plugins implementations that will run at this
// extenstion point.
slicePtr interface{}
}
func (f *framework) getExtensionPoints(plugins *config.Plugins) []extensionPoint {
return []extensionPoint{
{plugins.PreFilter, &f.preFilterPlugins},
{plugins.Filter, &f.filterPlugins},
{plugins.Reserve, &f.reservePlugins},
{plugins.PostFilter, &f.postFilterPlugins},
{plugins.Score, &f.scorePlugins},
{plugins.PreBind, &f.preBindPlugins},
{plugins.Bind, &f.bindPlugins},
{plugins.PostBind, &f.postBindPlugins},
{plugins.Unreserve, &f.unreservePlugins},
{plugins.Permit, &f.permitPlugins},
{plugins.QueueSort, &f.queueSortPlugins},
}
}
var _ = Framework(&framework{})
@ -73,29 +100,30 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
}
// get needed plugins from config
pg := pluginsNeeded(plugins)
pg := f.pluginsNeeded(plugins)
if len(pg) == 0 {
return f, nil
}
pluginConfig := pluginNameToConfig(args)
pluginConfig := make(map[string]*runtime.Unknown, 0)
for i := range args {
pluginConfig[args[i].Name] = &args[i].Args
}
pluginsMap := make(map[string]Plugin)
for name, factory := range r {
// initialize only needed plugins
// initialize only needed plugins.
if _, ok := pg[name]; !ok {
continue
}
// find the config args of a plugin
state := pluginConfig[name]
p, err := factory(state, f)
p, err := factory(pluginConfig[name], f)
if err != nil {
return nil, fmt.Errorf("error initializing plugin %q: %v", name, err)
}
pluginsMap[name] = p
// A weight of zero is not permitted, plugins can be disabled explicitly
// a weight of zero is not permitted, plugins can be disabled explicitly
// when configured.
f.pluginNameToWeightMap[name] = int(pg[name].Weight)
if f.pluginNameToWeightMap[name] == 0 {
@ -103,50 +131,14 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
}
}
if err := updatePluginList(&f.preFilterPlugins, plugins.PreFilter, pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(&f.filterPlugins, plugins.Filter, pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(&f.reservePlugins, plugins.Reserve, pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(&f.postFilterPlugins, plugins.PostFilter, pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(&f.scorePlugins, plugins.Score, pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(&f.preBindPlugins, plugins.PreBind, pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(&f.bindPlugins, plugins.Bind, pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(&f.postBindPlugins, plugins.PostBind, pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(&f.unreservePlugins, plugins.Unreserve, pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(&f.permitPlugins, plugins.Permit, pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(&f.queueSortPlugins, plugins.QueueSort, pluginsMap); err != nil {
return nil, err
for _, e := range f.getExtensionPoints(plugins) {
if err := updatePluginList(e.slicePtr, e.plugins, pluginsMap); err != nil {
return nil, err
}
}
// Verifying the score weights again since Plugin.Name() could return a different
// value from the one used in the configuration.
for _, scorePlugin := range f.scorePlugins {
if f.pluginNameToWeightMap[scorePlugin.Name()] == 0 {
return nil, fmt.Errorf("score plugin %q is not configured with weight", scorePlugin.Name())
@ -171,15 +163,15 @@ func updatePluginList(pluginList interface{}, pluginSet *config.PluginSet, plugi
for _, ep := range pluginSet.Enabled {
pg, ok := pluginsMap[ep.Name]
if !ok {
return fmt.Errorf("%s %q does not exist", pluginType.String(), ep.Name)
return fmt.Errorf("%s %q does not exist", pluginType.Name(), ep.Name)
}
if !reflect.TypeOf(pg).Implements(pluginType) {
return fmt.Errorf("plugin %q does not extend %s plugin", ep.Name, pluginType.String())
return fmt.Errorf("plugin %q does not extend %s plugin", ep.Name, pluginType.Name())
}
if set.Has(ep.Name) {
return fmt.Errorf("plugin %q already registered as %q", ep.Name, pluginType.String())
return fmt.Errorf("plugin %q already registered as %q", ep.Name, pluginType.Name())
}
set.Insert(ep.Name)
@ -534,17 +526,33 @@ func (f *framework) GetWaitingPod(uid types.UID) WaitingPod {
return f.waitingPods.get(uid)
}
func pluginNameToConfig(args []config.PluginConfig) map[string]*runtime.Unknown {
state := make(map[string]*runtime.Unknown, 0)
for i := range args {
// This is needed because the type of PluginConfig.Args is not pointer type.
p := args[i]
state[p.Name] = &p.Args
// ListPlugins returns a map of extension point name to plugin names configured at each extension
// point. Returns nil if no plugins where configred.
func (f *framework) ListPlugins() map[string][]string {
m := make(map[string][]string)
insert := func(ptr interface{}) {
plugins := reflect.ValueOf(ptr).Elem()
var names []string
for i := 0; i < plugins.Len(); i++ {
name := plugins.Index(i).Interface().(Plugin).Name()
names = append(names, name)
}
if len(names) > 0 {
extName := plugins.Type().Elem().Name()
m[extName] = names
}
}
return state
for _, e := range f.getExtensionPoints(&config.Plugins{}) {
insert(e.slicePtr)
}
if len(m) > 0 {
return m
}
return nil
}
func pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin {
func (f *framework) pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin {
pgMap := make(map[string]config.Plugin, 0)
if plugins == nil {
@ -559,17 +567,8 @@ func pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin {
pgMap[pg.Name] = pg
}
}
find(plugins.QueueSort)
find(plugins.PreFilter)
find(plugins.Filter)
find(plugins.PostFilter)
find(plugins.Score)
find(plugins.Reserve)
find(plugins.Permit)
find(plugins.PreBind)
find(plugins.Bind)
find(plugins.PostBind)
find(plugins.Unreserve)
for _, e := range f.getExtensionPoints(plugins) {
find(e.plugins)
}
return pgMap
}

View File

@ -405,6 +405,10 @@ type Framework interface {
// or "Success". If none of the plugins handled binding, RunBindPlugins returns
// code=4("skip") status.
RunBindPlugins(state *CycleState, pod *v1.Pod, nodeName string) *Status
// ListPlugins returns a map of extension point name to plugin names
// configured at each extension point.
ListPlugins() map[string][]string
}
// FrameworkHandle provides data and some tools that plugins can use. It is

View File

@ -163,6 +163,10 @@ func (*fakeFramework) QueueSortFunc() framework.LessFunc {
}
}
func (f *fakeFramework) ListPlugins() map[string][]string {
return nil
}
func (*fakeFramework) NodeInfoSnapshot() *schedulernodeinfo.Snapshot {
return nil
}

View File

@ -66,6 +66,7 @@ go_test(
"//test/integration/framework:go_default_library",
"//test/utils:go_default_library",
"//test/utils/image:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -899,7 +899,8 @@ func TestBindPlugin(t *testing.T) {
context := initTestSchedulerWithOptions(t, testContext, false, nil, time.Second,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithFrameworkPluginConfig(pluginConfig),
scheduler.WithFrameworkRegistry(registry))
scheduler.WithFrameworkRegistry(registry),
scheduler.WithFrameworkConfigProducerRegistry(nil))
defer cleanupTest(t, context)
// Add a few nodes.

View File

@ -23,6 +23,8 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -93,6 +95,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
policy string
expectedPredicates sets.String
expectedPrioritizers sets.String
expectedPlugins map[string][]string
}{
{
policy: `{
@ -136,7 +139,6 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
"MaxGCEPDVolumeCount",
"NoDiskConflict",
"NoVolumeZoneConflict",
"PodToleratesNodeTaints",
),
expectedPrioritizers: sets.NewString(
"BalancedResourceAllocation",
@ -148,6 +150,9 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
"TaintTolerationPriority",
"ImageLocalityPriority",
),
expectedPlugins: map[string][]string{
"FilterPlugin": {"TaintToleration"},
},
},
{
policy: `{
@ -201,7 +206,6 @@ kind: Policy
"MaxGCEPDVolumeCount",
"NoDiskConflict",
"NoVolumeZoneConflict",
"PodToleratesNodeTaints",
),
expectedPrioritizers: sets.NewString(
"BalancedResourceAllocation",
@ -213,6 +217,9 @@ kind: Policy
"TaintTolerationPriority",
"ImageLocalityPriority",
),
expectedPlugins: map[string][]string{
"FilterPlugin": {"TaintToleration"},
},
},
{
policy: `apiVersion: v1
@ -287,6 +294,10 @@ priorities: []
if !schedPrioritizers.Equal(test.expectedPrioritizers) {
t.Errorf("Expected priority functions %v, got %v", test.expectedPrioritizers, schedPrioritizers)
}
schedPlugins := sched.Framework.ListPlugins()
if diff := cmp.Diff(test.expectedPlugins, schedPlugins); diff != "" {
t.Errorf("unexpected predicates diff (-want, +got): %s", diff)
}
}
}