diff --git a/pkg/scheduler/algorithm/predicates/error.go b/pkg/scheduler/algorithm/predicates/error.go index d5325af3f2e..25378706b8e 100644 --- a/pkg/scheduler/algorithm/predicates/error.go +++ b/pkg/scheduler/algorithm/predicates/error.go @@ -82,6 +82,38 @@ var ( ErrFakePredicate = newPredicateFailureError("FakePredicateError", "Nodes failed the fake predicate") ) +var unresolvablePredicateFailureErrors = map[PredicateFailureReason]struct{}{ + ErrNodeSelectorNotMatch: {}, + ErrPodAffinityRulesNotMatch: {}, + ErrPodNotMatchHostName: {}, + ErrTaintsTolerationsNotMatch: {}, + ErrNodeLabelPresenceViolated: {}, + // Node conditions won't change when scheduler simulates removal of preemption victims. + // So, it is pointless to try nodes that have not been able to host the pod due to node + // conditions. These include ErrNodeNotReady, ErrNodeUnderPIDPressure, ErrNodeUnderMemoryPressure, .... + ErrNodeNotReady: {}, + ErrNodeNetworkUnavailable: {}, + ErrNodeUnderDiskPressure: {}, + ErrNodeUnderPIDPressure: {}, + ErrNodeUnderMemoryPressure: {}, + ErrNodeUnschedulable: {}, + ErrNodeUnknownCondition: {}, + ErrVolumeZoneConflict: {}, + ErrVolumeNodeConflict: {}, + ErrVolumeBindConflict: {}, +} + +// UnresolvablePredicateExists checks if there is at least one unresolvable predicate failure reason, if true +// returns the first one in the list. +func UnresolvablePredicateExists(reasons []PredicateFailureReason) PredicateFailureReason { + for _, r := range reasons { + if _, ok := unresolvablePredicateFailureErrors[r]; ok { + return r + } + } + return nil +} + // InsufficientResourceError is an error type that indicates what kind of resource limit is // hit and caused the unfitting failure. type InsufficientResourceError struct { diff --git a/pkg/scheduler/api/compatibility/compatibility_test.go b/pkg/scheduler/api/compatibility/compatibility_test.go index 09624889c8c..7812774c5ea 100644 --- a/pkg/scheduler/api/compatibility/compatibility_test.go +++ b/pkg/scheduler/api/compatibility/compatibility_test.go @@ -37,10 +37,11 @@ import ( func TestCompatibility_v1_Scheduler(t *testing.T) { // Add serialized versions of scheduler config that exercise available options to ensure compatibility between releases schedulerFiles := map[string]struct { - JSON string - wantPredicates sets.String - wantPrioritizers sets.String - wantExtenders []schedulerapi.ExtenderConfig + JSON string + wantPredicates sets.String + wantPrioritizers sets.String + wantFilterPlugins sets.String + wantExtenders []schedulerapi.ExtenderConfig }{ // Do not change this JSON after the corresponding release has been tagged. // A failure indicates backwards compatibility with the specified release was broken. @@ -214,7 +215,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "HostName", "NoDiskConflict", "NoVolumeZoneConflict", - "PodToleratesNodeTaints", "CheckNodeMemoryPressure", "MaxEBSVolumeCount", "MaxGCEPDVolumeCount", @@ -234,6 +234,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "TaintTolerationPriority", "InterPodAffinityPriority", ), + wantFilterPlugins: sets.NewString( + "TaintToleration", + ), }, // Do not change this JSON after the corresponding release has been tagged. @@ -279,7 +282,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "HostName", "NoDiskConflict", "NoVolumeZoneConflict", - "PodToleratesNodeTaints", "CheckNodeMemoryPressure", "CheckNodeDiskPressure", "MaxEBSVolumeCount", @@ -302,6 +304,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "InterPodAffinityPriority", "MostRequestedPriority", ), + wantFilterPlugins: sets.NewString( + "TaintToleration", + ), }, // Do not change this JSON after the corresponding release has been tagged. // A failure indicates backwards compatibility with the specified release was broken. @@ -356,7 +361,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "HostName", "NoDiskConflict", "NoVolumeZoneConflict", - "PodToleratesNodeTaints", "CheckNodeMemoryPressure", "CheckNodeDiskPressure", "MaxEBSVolumeCount", @@ -379,6 +383,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "InterPodAffinityPriority", "MostRequestedPriority", ), + wantFilterPlugins: sets.NewString( + "TaintToleration", + ), wantExtenders: []schedulerapi.ExtenderConfig{{ URLPrefix: "/prefix", FilterVerb: "filter", @@ -445,7 +452,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "HostName", "NoDiskConflict", "NoVolumeZoneConflict", - "PodToleratesNodeTaints", "CheckNodeMemoryPressure", "CheckNodeDiskPressure", "CheckNodeCondition", @@ -469,6 +475,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "InterPodAffinityPriority", "MostRequestedPriority", ), + wantFilterPlugins: sets.NewString( + "TaintToleration", + ), wantExtenders: []schedulerapi.ExtenderConfig{{ URLPrefix: "/prefix", FilterVerb: "filter", @@ -536,7 +545,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "HostName", "NoDiskConflict", "NoVolumeZoneConflict", - "PodToleratesNodeTaints", "CheckNodeMemoryPressure", "CheckNodeDiskPressure", "CheckNodeCondition", @@ -561,6 +569,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "InterPodAffinityPriority", "MostRequestedPriority", ), + wantFilterPlugins: sets.NewString( + "TaintToleration", + ), wantExtenders: []schedulerapi.ExtenderConfig{{ URLPrefix: "/prefix", FilterVerb: "filter", @@ -632,7 +643,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "HostName", "NoDiskConflict", "NoVolumeZoneConflict", - "PodToleratesNodeTaints", "CheckNodeMemoryPressure", "CheckNodeDiskPressure", "CheckNodePIDPressure", @@ -658,6 +668,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "InterPodAffinityPriority", "MostRequestedPriority", ), + wantFilterPlugins: sets.NewString( + "TaintToleration", + ), wantExtenders: []schedulerapi.ExtenderConfig{{ URLPrefix: "/prefix", FilterVerb: "filter", @@ -741,7 +754,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "HostName", "NoDiskConflict", "NoVolumeZoneConflict", - "PodToleratesNodeTaints", "CheckNodeMemoryPressure", "CheckNodeDiskPressure", "CheckNodePIDPressure", @@ -768,6 +780,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "MostRequestedPriority", "RequestedToCapacityRatioPriority", ), + wantFilterPlugins: sets.NewString( + "TaintToleration", + ), wantExtenders: []schedulerapi.ExtenderConfig{{ URLPrefix: "/prefix", FilterVerb: "filter", @@ -852,7 +867,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "HostName", "NoDiskConflict", "NoVolumeZoneConflict", - "PodToleratesNodeTaints", "CheckNodeMemoryPressure", "CheckNodeDiskPressure", "CheckNodePIDPressure", @@ -880,6 +894,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "MostRequestedPriority", "RequestedToCapacityRatioPriority", ), + wantFilterPlugins: sets.NewString( + "TaintToleration", + ), wantExtenders: []schedulerapi.ExtenderConfig{{ URLPrefix: "/prefix", FilterVerb: "filter", @@ -963,7 +980,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "HostName", "NoDiskConflict", "NoVolumeZoneConflict", - "PodToleratesNodeTaints", "CheckNodeMemoryPressure", "CheckNodeDiskPressure", "CheckNodePIDPressure", @@ -992,6 +1008,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "MostRequestedPriority", "RequestedToCapacityRatioPriority", ), + wantFilterPlugins: sets.NewString( + "TaintToleration", + ), wantExtenders: []schedulerapi.ExtenderConfig{{ URLPrefix: "/prefix", FilterVerb: "filter", @@ -1079,7 +1098,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "HostName", "NoDiskConflict", "NoVolumeZoneConflict", - "PodToleratesNodeTaints", "CheckNodeMemoryPressure", "CheckNodeDiskPressure", "CheckNodePIDPressure", @@ -1108,6 +1126,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "MostRequestedPriority", "RequestedToCapacityRatioPriority", ), + wantFilterPlugins: sets.NewString( + "TaintToleration", + ), wantExtenders: []schedulerapi.ExtenderConfig{{ URLPrefix: "/prefix", FilterVerb: "filter", @@ -1128,6 +1149,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { seenPredicates := sets.NewString() seenPriorities := sets.NewString() mandatoryPredicates := sets.NewString("CheckNodeCondition") + filterToPredicateMap := map[string]string{ + "TaintToleration": "PodToleratesNodeTaints", + } for v, tc := range schedulerFiles { t.Run(v, func(t *testing.T) { @@ -1163,26 +1187,39 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { algorithmSrc, make(chan struct{}), ) + if err != nil { t.Fatalf("%s: Error constructing: %v", v, err) } - schedPredicates := sets.NewString() + gotPredicates := sets.NewString() for p := range sched.Algorithm.Predicates() { - schedPredicates.Insert(p) + gotPredicates.Insert(p) } wantPredicates := tc.wantPredicates.Union(mandatoryPredicates) - if !schedPredicates.Equal(wantPredicates) { - t.Errorf("Got predicates %v, want %v", schedPredicates, wantPredicates) - } - schedPrioritizers := sets.NewString() - for _, p := range sched.Algorithm.Prioritizers() { - schedPrioritizers.Insert(p.Name) + if !gotPredicates.Equal(wantPredicates) { + t.Errorf("Got predicates %v, want %v", gotPredicates, wantPredicates) } - if !schedPrioritizers.Equal(tc.wantPrioritizers) { - t.Errorf("Got prioritizers %v, want %v", schedPrioritizers, tc.wantPrioritizers) + gotPrioritizers := sets.NewString() + for _, p := range sched.Algorithm.Prioritizers() { + gotPrioritizers.Insert(p.Name) } - schedExtenders := sched.Algorithm.Extenders() + if !gotPrioritizers.Equal(tc.wantPrioritizers) { + t.Errorf("Got prioritizers %v, want %v", gotPrioritizers, tc.wantPrioritizers) + } + + gotFilterPlugins := sets.NewString() + plugins := sched.Framework.ListPlugins() + for _, p := range plugins["FilterPlugin"] { + gotFilterPlugins.Insert(p) + seenPredicates.Insert(filterToPredicateMap[p]) + + } + if !gotFilterPlugins.Equal(tc.wantFilterPlugins) { + t.Errorf("Got filter plugins %v, want %v", gotFilterPlugins, tc.wantFilterPlugins) + } + + gotExtenders := sched.Algorithm.Extenders() var wantExtenders []*core.HTTPExtender for _, e := range tc.wantExtenders { extender, err := core.NewHTTPExtender(&e) @@ -1191,13 +1228,14 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { } wantExtenders = append(wantExtenders, extender.(*core.HTTPExtender)) } - for i := range schedExtenders { - if !core.Equal(wantExtenders[i], schedExtenders[i].(*core.HTTPExtender)) { - t.Errorf("Got extender #%d %+v, want %+v", i, schedExtenders[i], wantExtenders[i]) + for i := range gotExtenders { + if !core.Equal(wantExtenders[i], gotExtenders[i].(*core.HTTPExtender)) { + t.Errorf("Got extender #%d %+v, want %+v", i, gotExtenders[i], wantExtenders[i]) } } - seenPredicates = seenPredicates.Union(schedPredicates) - seenPriorities = seenPriorities.Union(schedPrioritizers) + + seenPredicates = seenPredicates.Union(gotPredicates) + seenPriorities = seenPriorities.Union(gotPrioritizers) }) } diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 91e54284e1a..9e52f714f6e 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -65,27 +65,6 @@ const ( minFeasibleNodesPercentageToFind = 5 ) -var unresolvablePredicateFailureErrors = map[predicates.PredicateFailureReason]struct{}{ - predicates.ErrNodeSelectorNotMatch: {}, - predicates.ErrPodAffinityRulesNotMatch: {}, - predicates.ErrPodNotMatchHostName: {}, - predicates.ErrTaintsTolerationsNotMatch: {}, - predicates.ErrNodeLabelPresenceViolated: {}, - // Node conditions won't change when scheduler simulates removal of preemption victims. - // So, it is pointless to try nodes that have not been able to host the pod due to node - // conditions. These include ErrNodeNotReady, ErrNodeUnderPIDPressure, ErrNodeUnderMemoryPressure, .... - predicates.ErrNodeNotReady: {}, - predicates.ErrNodeNetworkUnavailable: {}, - predicates.ErrNodeUnderDiskPressure: {}, - predicates.ErrNodeUnderPIDPressure: {}, - predicates.ErrNodeUnderMemoryPressure: {}, - predicates.ErrNodeUnschedulable: {}, - predicates.ErrNodeUnknownCondition: {}, - predicates.ErrVolumeZoneConflict: {}, - predicates.ErrVolumeNodeConflict: {}, - predicates.ErrVolumeBindConflict: {}, -} - // FailedPredicateMap declares a map[string][]algorithm.PredicateFailureReason type. type FailedPredicateMap map[string][]predicates.PredicateFailureReason @@ -1204,16 +1183,6 @@ func (g *genericScheduler) selectVictimsOnNode( return victims, numViolatingVictim, true } -// unresolvablePredicateExists checks whether failedPredicates has unresolvable predicate. -func unresolvablePredicateExists(failedPredicates []predicates.PredicateFailureReason) bool { - for _, failedPredicate := range failedPredicates { - if _, ok := unresolvablePredicateFailureErrors[failedPredicate]; ok { - return true - } - } - return false -} - // nodesWherePreemptionMightHelp returns a list of nodes with failed predicates // that may be satisfied by removing pods from the node. func nodesWherePreemptionMightHelp(nodes []*v1.Node, fitErr *FitError) []*v1.Node { @@ -1228,7 +1197,7 @@ func nodesWherePreemptionMightHelp(nodes []*v1.Node, fitErr *FitError) []*v1.Nod // to rely less on such assumptions in the code when checking does not impose // significant overhead. // Also, we currently assume all failures returned by extender as resolvable. - if !unresolvablePredicateExists(failedPredicates) { + if predicates.UnresolvablePredicateExists(failedPredicates) == nil { klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name) potentialNodes = append(potentialNodes, node) } diff --git a/pkg/scheduler/framework/plugins/BUILD b/pkg/scheduler/framework/plugins/BUILD index 15589a8b0b4..ac712c77312 100644 --- a/pkg/scheduler/framework/plugins/BUILD +++ b/pkg/scheduler/framework/plugins/BUILD @@ -6,8 +6,9 @@ go_library( importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins", visibility = ["//visibility:public"], deps = [ + "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/apis/config:go_default_library", - "//pkg/scheduler/framework/plugins/noop:go_default_library", + "//pkg/scheduler/framework/plugins/tainttoleration:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", ], ) @@ -24,7 +25,9 @@ filegroup( srcs = [ ":package-srcs", "//pkg/scheduler/framework/plugins/examples:all-srcs", + "//pkg/scheduler/framework/plugins/migration:all-srcs", "//pkg/scheduler/framework/plugins/noop:all-srcs", + "//pkg/scheduler/framework/plugins/tainttoleration:all-srcs", ], tags = ["automanaged"], visibility = ["//visibility:public"], diff --git a/pkg/scheduler/framework/plugins/default_registry.go b/pkg/scheduler/framework/plugins/default_registry.go index ac91fbe0f02..2503da18058 100644 --- a/pkg/scheduler/framework/plugins/default_registry.go +++ b/pkg/scheduler/framework/plugins/default_registry.go @@ -19,8 +19,9 @@ package plugins import ( "fmt" + "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/apis/config" - noop "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noop" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" ) @@ -29,9 +30,7 @@ import ( // runs custom plugins, can pass a different Registry when initializing the scheduler. func NewDefaultRegistry() framework.Registry { return framework.Registry{ - // This is just a test plugin to showcase the setup, it should be deleted once - // we have at least one legitimate plugin here. - noop.Name: noop.New, + tainttoleration.Name: tainttoleration.New, } } @@ -55,10 +54,17 @@ type ConfigProducerRegistry struct { // NewDefaultConfigProducerRegistry creates a new producer registry. func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry { - return &ConfigProducerRegistry{ + registry := &ConfigProducerRegistry{ PredicateToConfigProducer: make(map[string]ConfigProducer), PriorityToConfigProducer: make(map[string]ConfigProducer), } + registry.RegisterPredicate(predicates.PodToleratesNodeTaintsPred, + func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { + plugins.Filter = appendToPluginSet(plugins.Filter, tainttoleration.Name, nil) + return + }) + + return registry } func registerProducer(name string, producer ConfigProducer, producersMap map[string]ConfigProducer) error { @@ -78,3 +84,15 @@ func (f *ConfigProducerRegistry) RegisterPredicate(name string, producer ConfigP func (f *ConfigProducerRegistry) RegisterPriority(name string, producer ConfigProducer) error { return registerProducer(name, producer, f.PriorityToConfigProducer) } + +func appendToPluginSet(set *config.PluginSet, name string, weight *int32) *config.PluginSet { + if set == nil { + set = &config.PluginSet{} + } + cfg := config.Plugin{Name: name} + if weight != nil { + cfg.Weight = *weight + } + set.Enabled = append(set.Enabled, cfg) + return set +} diff --git a/pkg/scheduler/framework/plugins/default_registry_test.go b/pkg/scheduler/framework/plugins/default_registry_test.go index eaf74362a9c..2f862e1b6c5 100644 --- a/pkg/scheduler/framework/plugins/default_registry_test.go +++ b/pkg/scheduler/framework/plugins/default_registry_test.go @@ -25,18 +25,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config" ) -func appendToPluginSet(pluginSet *config.PluginSet, name string, weight *int32) *config.PluginSet { - if pluginSet == nil { - pluginSet = &config.PluginSet{} - } - config := config.Plugin{Name: name} - if weight != nil { - config.Weight = *weight - } - pluginSet.Enabled = append(pluginSet.Enabled, config) - return pluginSet -} - func produceConfig(keys []string, producersMap map[string]ConfigProducer, args ConfigProducerArgs) (*config.Plugins, []config.PluginConfig, error) { var plugins config.Plugins var pluginConfig []config.PluginConfig diff --git a/pkg/scheduler/framework/plugins/migration/BUILD b/pkg/scheduler/framework/plugins/migration/BUILD new file mode 100644 index 00000000000..db78246abb4 --- /dev/null +++ b/pkg/scheduler/framework/plugins/migration/BUILD @@ -0,0 +1,36 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["utils.go"], + importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration", + visibility = ["//visibility:public"], + deps = [ + "//pkg/scheduler/algorithm/predicates:go_default_library", + "//pkg/scheduler/framework/v1alpha1:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) + +go_test( + name = "go_default_test", + srcs = ["utils_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/scheduler/algorithm/predicates:go_default_library", + "//pkg/scheduler/framework/v1alpha1:go_default_library", + ], +) diff --git a/pkg/scheduler/framework/plugins/migration/utils.go b/pkg/scheduler/framework/plugins/migration/utils.go new file mode 100644 index 00000000000..5962a46f71c --- /dev/null +++ b/pkg/scheduler/framework/plugins/migration/utils.go @@ -0,0 +1,49 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package migration + +import ( + "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" +) + +// PredicateResultToFrameworkStatus converts a predicate result (PredicateFailureReason + error) +// to a framework status. +func PredicateResultToFrameworkStatus(reasons []predicates.PredicateFailureReason, err error) *framework.Status { + if s := ErrorToFrameworkStatus(err); s != nil { + return s + } + + if len(reasons) == 0 { + return nil + } + + if r := predicates.UnresolvablePredicateExists(reasons); r != nil { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, r.GetReason()) + } + + // We will just use the first reason. + return framework.NewStatus(framework.Unschedulable, reasons[0].GetReason()) +} + +// ErrorToFrameworkStatus converts an error to a framework status. +func ErrorToFrameworkStatus(err error) *framework.Status { + if err != nil { + return framework.NewStatus(framework.Error, err.Error()) + } + return nil +} diff --git a/pkg/scheduler/framework/plugins/migration/utils_test.go b/pkg/scheduler/framework/plugins/migration/utils_test.go new file mode 100644 index 00000000000..c710a458b14 --- /dev/null +++ b/pkg/scheduler/framework/plugins/migration/utils_test.go @@ -0,0 +1,68 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package migration + +import ( + "errors" + "reflect" + "testing" + + "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" +) + +func TestPredicateResultToFrameworkStatus(t *testing.T) { + tests := []struct { + name string + err error + reasons []predicates.PredicateFailureReason + wantStatus *framework.Status + }{ + { + name: "Success", + }, + { + name: "Error", + err: errors.New("Failed with error"), + wantStatus: framework.NewStatus(framework.Error, "Failed with error"), + }, + { + name: "Error with reason", + err: errors.New("Failed with error"), + reasons: []predicates.PredicateFailureReason{predicates.ErrDiskConflict}, + wantStatus: framework.NewStatus(framework.Error, "Failed with error"), + }, + { + name: "Unschedulable", + reasons: []predicates.PredicateFailureReason{predicates.ErrExistingPodsAntiAffinityRulesNotMatch}, + wantStatus: framework.NewStatus(framework.Unschedulable, "node(s) didn't satisfy existing pods anti-affinity rules"), + }, + { + name: "Unschedulable and Unresolvable", + reasons: []predicates.PredicateFailureReason{predicates.ErrDiskConflict, predicates.ErrNodeSelectorNotMatch}, + wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't match node selector"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotStatus := PredicateResultToFrameworkStatus(tt.reasons, tt.err) + if !reflect.DeepEqual(tt.wantStatus, gotStatus) { + t.Errorf("Got status %v, want %v", gotStatus, tt.wantStatus) + } + }) + } +} diff --git a/pkg/scheduler/framework/plugins/tainttoleration/BUILD b/pkg/scheduler/framework/plugins/tainttoleration/BUILD new file mode 100644 index 00000000000..b8b575b7e90 --- /dev/null +++ b/pkg/scheduler/framework/plugins/tainttoleration/BUILD @@ -0,0 +1,30 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["taint_toleration.go"], + importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration", + visibility = ["//visibility:public"], + deps = [ + "//pkg/scheduler/algorithm/predicates:go_default_library", + "//pkg/scheduler/framework/plugins/migration:go_default_library", + "//pkg/scheduler/framework/v1alpha1:go_default_library", + "//pkg/scheduler/nodeinfo:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go new file mode 100644 index 00000000000..36490c70a45 --- /dev/null +++ b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go @@ -0,0 +1,50 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tainttoleration + +import ( + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" + "k8s.io/kubernetes/pkg/scheduler/nodeinfo" +) + +// TaintToleration is a plugin that checks if a pod tolerates a node's taints. +type TaintToleration struct{} + +var _ = framework.FilterPlugin(&TaintToleration{}) + +// Name is the name of the plugin used in the plugin registry and configurations. +const Name = "TaintToleration" + +// Name returns name of the plugin. It is used in logs, etc. +func (pl *TaintToleration) Name() string { + return Name +} + +// Filter invoked at the filter extension point. +func (pl *TaintToleration) Filter(_ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { + _, reasons, err := predicates.PodToleratesNodeTaints(pod, nil, nodeInfo) + return migration.PredicateResultToFrameworkStatus(reasons, err) +} + +// New initializes a new plugin and returns it. +func New(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { + return &TaintToleration{}, nil +} diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index 451b508544a..b5de321f316 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -33,6 +33,11 @@ import ( schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) +const ( + // Specifies the maximum timeout a permit plugin can return. + maxTimeout time.Duration = 15 * time.Minute +) + // framework is the component responsible for initializing and running scheduler // plugins. type framework struct { @@ -53,10 +58,32 @@ type framework struct { permitPlugins []PermitPlugin } -const ( - // Specifies the maximum timeout a permit plugin can return. - maxTimeout time.Duration = 15 * time.Minute -) +// extensionPoint encapsulates desired and applied set of plugins at a specific extension +// point. This is used to simplify iterating over all extension points supported by the +// framework. +type extensionPoint struct { + // the set of plugins to be configured at this extension point. + plugins *config.PluginSet + // a pointer to the slice storing plugins implementations that will run at this + // extenstion point. + slicePtr interface{} +} + +func (f *framework) getExtensionPoints(plugins *config.Plugins) []extensionPoint { + return []extensionPoint{ + {plugins.PreFilter, &f.preFilterPlugins}, + {plugins.Filter, &f.filterPlugins}, + {plugins.Reserve, &f.reservePlugins}, + {plugins.PostFilter, &f.postFilterPlugins}, + {plugins.Score, &f.scorePlugins}, + {plugins.PreBind, &f.preBindPlugins}, + {plugins.Bind, &f.bindPlugins}, + {plugins.PostBind, &f.postBindPlugins}, + {plugins.Unreserve, &f.unreservePlugins}, + {plugins.Permit, &f.permitPlugins}, + {plugins.QueueSort, &f.queueSortPlugins}, + } +} var _ = Framework(&framework{}) @@ -73,29 +100,30 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi } // get needed plugins from config - pg := pluginsNeeded(plugins) + pg := f.pluginsNeeded(plugins) if len(pg) == 0 { return f, nil } - pluginConfig := pluginNameToConfig(args) + pluginConfig := make(map[string]*runtime.Unknown, 0) + for i := range args { + pluginConfig[args[i].Name] = &args[i].Args + } + pluginsMap := make(map[string]Plugin) for name, factory := range r { - // initialize only needed plugins + // initialize only needed plugins. if _, ok := pg[name]; !ok { continue } - // find the config args of a plugin - state := pluginConfig[name] - - p, err := factory(state, f) + p, err := factory(pluginConfig[name], f) if err != nil { return nil, fmt.Errorf("error initializing plugin %q: %v", name, err) } pluginsMap[name] = p - // A weight of zero is not permitted, plugins can be disabled explicitly + // a weight of zero is not permitted, plugins can be disabled explicitly // when configured. f.pluginNameToWeightMap[name] = int(pg[name].Weight) if f.pluginNameToWeightMap[name] == 0 { @@ -103,50 +131,14 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi } } - if err := updatePluginList(&f.preFilterPlugins, plugins.PreFilter, pluginsMap); err != nil { - return nil, err - } - - if err := updatePluginList(&f.filterPlugins, plugins.Filter, pluginsMap); err != nil { - return nil, err - } - - if err := updatePluginList(&f.reservePlugins, plugins.Reserve, pluginsMap); err != nil { - return nil, err - } - - if err := updatePluginList(&f.postFilterPlugins, plugins.PostFilter, pluginsMap); err != nil { - return nil, err - } - - if err := updatePluginList(&f.scorePlugins, plugins.Score, pluginsMap); err != nil { - return nil, err - } - - if err := updatePluginList(&f.preBindPlugins, plugins.PreBind, pluginsMap); err != nil { - return nil, err - } - - if err := updatePluginList(&f.bindPlugins, plugins.Bind, pluginsMap); err != nil { - return nil, err - } - - if err := updatePluginList(&f.postBindPlugins, plugins.PostBind, pluginsMap); err != nil { - return nil, err - } - - if err := updatePluginList(&f.unreservePlugins, plugins.Unreserve, pluginsMap); err != nil { - return nil, err - } - - if err := updatePluginList(&f.permitPlugins, plugins.Permit, pluginsMap); err != nil { - return nil, err - } - - if err := updatePluginList(&f.queueSortPlugins, plugins.QueueSort, pluginsMap); err != nil { - return nil, err + for _, e := range f.getExtensionPoints(plugins) { + if err := updatePluginList(e.slicePtr, e.plugins, pluginsMap); err != nil { + return nil, err + } } + // Verifying the score weights again since Plugin.Name() could return a different + // value from the one used in the configuration. for _, scorePlugin := range f.scorePlugins { if f.pluginNameToWeightMap[scorePlugin.Name()] == 0 { return nil, fmt.Errorf("score plugin %q is not configured with weight", scorePlugin.Name()) @@ -171,15 +163,15 @@ func updatePluginList(pluginList interface{}, pluginSet *config.PluginSet, plugi for _, ep := range pluginSet.Enabled { pg, ok := pluginsMap[ep.Name] if !ok { - return fmt.Errorf("%s %q does not exist", pluginType.String(), ep.Name) + return fmt.Errorf("%s %q does not exist", pluginType.Name(), ep.Name) } if !reflect.TypeOf(pg).Implements(pluginType) { - return fmt.Errorf("plugin %q does not extend %s plugin", ep.Name, pluginType.String()) + return fmt.Errorf("plugin %q does not extend %s plugin", ep.Name, pluginType.Name()) } if set.Has(ep.Name) { - return fmt.Errorf("plugin %q already registered as %q", ep.Name, pluginType.String()) + return fmt.Errorf("plugin %q already registered as %q", ep.Name, pluginType.Name()) } set.Insert(ep.Name) @@ -534,17 +526,33 @@ func (f *framework) GetWaitingPod(uid types.UID) WaitingPod { return f.waitingPods.get(uid) } -func pluginNameToConfig(args []config.PluginConfig) map[string]*runtime.Unknown { - state := make(map[string]*runtime.Unknown, 0) - for i := range args { - // This is needed because the type of PluginConfig.Args is not pointer type. - p := args[i] - state[p.Name] = &p.Args +// ListPlugins returns a map of extension point name to plugin names configured at each extension +// point. Returns nil if no plugins where configred. +func (f *framework) ListPlugins() map[string][]string { + m := make(map[string][]string) + + insert := func(ptr interface{}) { + plugins := reflect.ValueOf(ptr).Elem() + var names []string + for i := 0; i < plugins.Len(); i++ { + name := plugins.Index(i).Interface().(Plugin).Name() + names = append(names, name) + } + if len(names) > 0 { + extName := plugins.Type().Elem().Name() + m[extName] = names + } } - return state + for _, e := range f.getExtensionPoints(&config.Plugins{}) { + insert(e.slicePtr) + } + if len(m) > 0 { + return m + } + return nil } -func pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin { +func (f *framework) pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin { pgMap := make(map[string]config.Plugin, 0) if plugins == nil { @@ -559,17 +567,8 @@ func pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin { pgMap[pg.Name] = pg } } - find(plugins.QueueSort) - find(plugins.PreFilter) - find(plugins.Filter) - find(plugins.PostFilter) - find(plugins.Score) - find(plugins.Reserve) - find(plugins.Permit) - find(plugins.PreBind) - find(plugins.Bind) - find(plugins.PostBind) - find(plugins.Unreserve) - + for _, e := range f.getExtensionPoints(plugins) { + find(e.plugins) + } return pgMap } diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index c8580b5d15a..1c72cbd5a1d 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -405,6 +405,10 @@ type Framework interface { // or "Success". If none of the plugins handled binding, RunBindPlugins returns // code=4("skip") status. RunBindPlugins(state *CycleState, pod *v1.Pod, nodeName string) *Status + + // ListPlugins returns a map of extension point name to plugin names + // configured at each extension point. + ListPlugins() map[string][]string } // FrameworkHandle provides data and some tools that plugins can use. It is diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 3a47674f8c3..ac09364642f 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -163,6 +163,10 @@ func (*fakeFramework) QueueSortFunc() framework.LessFunc { } } +func (f *fakeFramework) ListPlugins() map[string][]string { + return nil +} + func (*fakeFramework) NodeInfoSnapshot() *schedulernodeinfo.Snapshot { return nil } diff --git a/test/integration/scheduler/BUILD b/test/integration/scheduler/BUILD index 8ff23ec5989..3a2063a2011 100644 --- a/test/integration/scheduler/BUILD +++ b/test/integration/scheduler/BUILD @@ -66,6 +66,7 @@ go_test( "//test/integration/framework:go_default_library", "//test/utils:go_default_library", "//test/utils/image:go_default_library", + "//vendor/github.com/google/go-cmp/cmp:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index c1e85c38948..eaf50721907 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -899,7 +899,8 @@ func TestBindPlugin(t *testing.T) { context := initTestSchedulerWithOptions(t, testContext, false, nil, time.Second, scheduler.WithFrameworkPlugins(plugins), scheduler.WithFrameworkPluginConfig(pluginConfig), - scheduler.WithFrameworkRegistry(registry)) + scheduler.WithFrameworkRegistry(registry), + scheduler.WithFrameworkConfigProducerRegistry(nil)) defer cleanupTest(t, context) // Add a few nodes. diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 2659178c37c..d21eef44545 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -23,6 +23,8 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -93,6 +95,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) { policy string expectedPredicates sets.String expectedPrioritizers sets.String + expectedPlugins map[string][]string }{ { policy: `{ @@ -136,7 +139,6 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) { "MaxGCEPDVolumeCount", "NoDiskConflict", "NoVolumeZoneConflict", - "PodToleratesNodeTaints", ), expectedPrioritizers: sets.NewString( "BalancedResourceAllocation", @@ -148,6 +150,9 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) { "TaintTolerationPriority", "ImageLocalityPriority", ), + expectedPlugins: map[string][]string{ + "FilterPlugin": {"TaintToleration"}, + }, }, { policy: `{ @@ -201,7 +206,6 @@ kind: Policy "MaxGCEPDVolumeCount", "NoDiskConflict", "NoVolumeZoneConflict", - "PodToleratesNodeTaints", ), expectedPrioritizers: sets.NewString( "BalancedResourceAllocation", @@ -213,6 +217,9 @@ kind: Policy "TaintTolerationPriority", "ImageLocalityPriority", ), + expectedPlugins: map[string][]string{ + "FilterPlugin": {"TaintToleration"}, + }, }, { policy: `apiVersion: v1 @@ -287,6 +294,10 @@ priorities: [] if !schedPrioritizers.Equal(test.expectedPrioritizers) { t.Errorf("Expected priority functions %v, got %v", test.expectedPrioritizers, schedPrioritizers) } + schedPlugins := sched.Framework.ListPlugins() + if diff := cmp.Diff(test.expectedPlugins, schedPlugins); diff != "" { + t.Errorf("unexpected predicates diff (-want, +got): %s", diff) + } } }