Merge pull request #83460 from ahg-g/ahg-first-predicate

[migration phase 1] Implement PodToleratesNodeTaint as a filter plugin
This commit is contained in:
Kubernetes Prow Robot 2019-10-05 14:07:11 -07:00 committed by GitHub
commit 1f81255070
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 461 additions and 160 deletions

View File

@ -82,6 +82,38 @@ var (
ErrFakePredicate = newPredicateFailureError("FakePredicateError", "Nodes failed the fake predicate") ErrFakePredicate = newPredicateFailureError("FakePredicateError", "Nodes failed the fake predicate")
) )
var unresolvablePredicateFailureErrors = map[PredicateFailureReason]struct{}{
ErrNodeSelectorNotMatch: {},
ErrPodAffinityRulesNotMatch: {},
ErrPodNotMatchHostName: {},
ErrTaintsTolerationsNotMatch: {},
ErrNodeLabelPresenceViolated: {},
// Node conditions won't change when scheduler simulates removal of preemption victims.
// So, it is pointless to try nodes that have not been able to host the pod due to node
// conditions. These include ErrNodeNotReady, ErrNodeUnderPIDPressure, ErrNodeUnderMemoryPressure, ....
ErrNodeNotReady: {},
ErrNodeNetworkUnavailable: {},
ErrNodeUnderDiskPressure: {},
ErrNodeUnderPIDPressure: {},
ErrNodeUnderMemoryPressure: {},
ErrNodeUnschedulable: {},
ErrNodeUnknownCondition: {},
ErrVolumeZoneConflict: {},
ErrVolumeNodeConflict: {},
ErrVolumeBindConflict: {},
}
// UnresolvablePredicateExists checks if there is at least one unresolvable predicate failure reason, if true
// returns the first one in the list.
func UnresolvablePredicateExists(reasons []PredicateFailureReason) PredicateFailureReason {
for _, r := range reasons {
if _, ok := unresolvablePredicateFailureErrors[r]; ok {
return r
}
}
return nil
}
// InsufficientResourceError is an error type that indicates what kind of resource limit is // InsufficientResourceError is an error type that indicates what kind of resource limit is
// hit and caused the unfitting failure. // hit and caused the unfitting failure.
type InsufficientResourceError struct { type InsufficientResourceError struct {

View File

@ -37,10 +37,11 @@ import (
func TestCompatibility_v1_Scheduler(t *testing.T) { func TestCompatibility_v1_Scheduler(t *testing.T) {
// Add serialized versions of scheduler config that exercise available options to ensure compatibility between releases // Add serialized versions of scheduler config that exercise available options to ensure compatibility between releases
schedulerFiles := map[string]struct { schedulerFiles := map[string]struct {
JSON string JSON string
wantPredicates sets.String wantPredicates sets.String
wantPrioritizers sets.String wantPrioritizers sets.String
wantExtenders []schedulerapi.ExtenderConfig wantFilterPlugins sets.String
wantExtenders []schedulerapi.ExtenderConfig
}{ }{
// Do not change this JSON after the corresponding release has been tagged. // Do not change this JSON after the corresponding release has been tagged.
// A failure indicates backwards compatibility with the specified release was broken. // A failure indicates backwards compatibility with the specified release was broken.
@ -214,7 +215,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"HostName", "HostName",
"NoDiskConflict", "NoDiskConflict",
"NoVolumeZoneConflict", "NoVolumeZoneConflict",
"PodToleratesNodeTaints",
"CheckNodeMemoryPressure", "CheckNodeMemoryPressure",
"MaxEBSVolumeCount", "MaxEBSVolumeCount",
"MaxGCEPDVolumeCount", "MaxGCEPDVolumeCount",
@ -234,6 +234,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"TaintTolerationPriority", "TaintTolerationPriority",
"InterPodAffinityPriority", "InterPodAffinityPriority",
), ),
wantFilterPlugins: sets.NewString(
"TaintToleration",
),
}, },
// Do not change this JSON after the corresponding release has been tagged. // Do not change this JSON after the corresponding release has been tagged.
@ -279,7 +282,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"HostName", "HostName",
"NoDiskConflict", "NoDiskConflict",
"NoVolumeZoneConflict", "NoVolumeZoneConflict",
"PodToleratesNodeTaints",
"CheckNodeMemoryPressure", "CheckNodeMemoryPressure",
"CheckNodeDiskPressure", "CheckNodeDiskPressure",
"MaxEBSVolumeCount", "MaxEBSVolumeCount",
@ -302,6 +304,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"InterPodAffinityPriority", "InterPodAffinityPriority",
"MostRequestedPriority", "MostRequestedPriority",
), ),
wantFilterPlugins: sets.NewString(
"TaintToleration",
),
}, },
// Do not change this JSON after the corresponding release has been tagged. // Do not change this JSON after the corresponding release has been tagged.
// A failure indicates backwards compatibility with the specified release was broken. // A failure indicates backwards compatibility with the specified release was broken.
@ -356,7 +361,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"HostName", "HostName",
"NoDiskConflict", "NoDiskConflict",
"NoVolumeZoneConflict", "NoVolumeZoneConflict",
"PodToleratesNodeTaints",
"CheckNodeMemoryPressure", "CheckNodeMemoryPressure",
"CheckNodeDiskPressure", "CheckNodeDiskPressure",
"MaxEBSVolumeCount", "MaxEBSVolumeCount",
@ -379,6 +383,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"InterPodAffinityPriority", "InterPodAffinityPriority",
"MostRequestedPriority", "MostRequestedPriority",
), ),
wantFilterPlugins: sets.NewString(
"TaintToleration",
),
wantExtenders: []schedulerapi.ExtenderConfig{{ wantExtenders: []schedulerapi.ExtenderConfig{{
URLPrefix: "/prefix", URLPrefix: "/prefix",
FilterVerb: "filter", FilterVerb: "filter",
@ -445,7 +452,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"HostName", "HostName",
"NoDiskConflict", "NoDiskConflict",
"NoVolumeZoneConflict", "NoVolumeZoneConflict",
"PodToleratesNodeTaints",
"CheckNodeMemoryPressure", "CheckNodeMemoryPressure",
"CheckNodeDiskPressure", "CheckNodeDiskPressure",
"CheckNodeCondition", "CheckNodeCondition",
@ -469,6 +475,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"InterPodAffinityPriority", "InterPodAffinityPriority",
"MostRequestedPriority", "MostRequestedPriority",
), ),
wantFilterPlugins: sets.NewString(
"TaintToleration",
),
wantExtenders: []schedulerapi.ExtenderConfig{{ wantExtenders: []schedulerapi.ExtenderConfig{{
URLPrefix: "/prefix", URLPrefix: "/prefix",
FilterVerb: "filter", FilterVerb: "filter",
@ -536,7 +545,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"HostName", "HostName",
"NoDiskConflict", "NoDiskConflict",
"NoVolumeZoneConflict", "NoVolumeZoneConflict",
"PodToleratesNodeTaints",
"CheckNodeMemoryPressure", "CheckNodeMemoryPressure",
"CheckNodeDiskPressure", "CheckNodeDiskPressure",
"CheckNodeCondition", "CheckNodeCondition",
@ -561,6 +569,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"InterPodAffinityPriority", "InterPodAffinityPriority",
"MostRequestedPriority", "MostRequestedPriority",
), ),
wantFilterPlugins: sets.NewString(
"TaintToleration",
),
wantExtenders: []schedulerapi.ExtenderConfig{{ wantExtenders: []schedulerapi.ExtenderConfig{{
URLPrefix: "/prefix", URLPrefix: "/prefix",
FilterVerb: "filter", FilterVerb: "filter",
@ -632,7 +643,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"HostName", "HostName",
"NoDiskConflict", "NoDiskConflict",
"NoVolumeZoneConflict", "NoVolumeZoneConflict",
"PodToleratesNodeTaints",
"CheckNodeMemoryPressure", "CheckNodeMemoryPressure",
"CheckNodeDiskPressure", "CheckNodeDiskPressure",
"CheckNodePIDPressure", "CheckNodePIDPressure",
@ -658,6 +668,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"InterPodAffinityPriority", "InterPodAffinityPriority",
"MostRequestedPriority", "MostRequestedPriority",
), ),
wantFilterPlugins: sets.NewString(
"TaintToleration",
),
wantExtenders: []schedulerapi.ExtenderConfig{{ wantExtenders: []schedulerapi.ExtenderConfig{{
URLPrefix: "/prefix", URLPrefix: "/prefix",
FilterVerb: "filter", FilterVerb: "filter",
@ -741,7 +754,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"HostName", "HostName",
"NoDiskConflict", "NoDiskConflict",
"NoVolumeZoneConflict", "NoVolumeZoneConflict",
"PodToleratesNodeTaints",
"CheckNodeMemoryPressure", "CheckNodeMemoryPressure",
"CheckNodeDiskPressure", "CheckNodeDiskPressure",
"CheckNodePIDPressure", "CheckNodePIDPressure",
@ -768,6 +780,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"MostRequestedPriority", "MostRequestedPriority",
"RequestedToCapacityRatioPriority", "RequestedToCapacityRatioPriority",
), ),
wantFilterPlugins: sets.NewString(
"TaintToleration",
),
wantExtenders: []schedulerapi.ExtenderConfig{{ wantExtenders: []schedulerapi.ExtenderConfig{{
URLPrefix: "/prefix", URLPrefix: "/prefix",
FilterVerb: "filter", FilterVerb: "filter",
@ -852,7 +867,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"HostName", "HostName",
"NoDiskConflict", "NoDiskConflict",
"NoVolumeZoneConflict", "NoVolumeZoneConflict",
"PodToleratesNodeTaints",
"CheckNodeMemoryPressure", "CheckNodeMemoryPressure",
"CheckNodeDiskPressure", "CheckNodeDiskPressure",
"CheckNodePIDPressure", "CheckNodePIDPressure",
@ -880,6 +894,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"MostRequestedPriority", "MostRequestedPriority",
"RequestedToCapacityRatioPriority", "RequestedToCapacityRatioPriority",
), ),
wantFilterPlugins: sets.NewString(
"TaintToleration",
),
wantExtenders: []schedulerapi.ExtenderConfig{{ wantExtenders: []schedulerapi.ExtenderConfig{{
URLPrefix: "/prefix", URLPrefix: "/prefix",
FilterVerb: "filter", FilterVerb: "filter",
@ -963,7 +980,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"HostName", "HostName",
"NoDiskConflict", "NoDiskConflict",
"NoVolumeZoneConflict", "NoVolumeZoneConflict",
"PodToleratesNodeTaints",
"CheckNodeMemoryPressure", "CheckNodeMemoryPressure",
"CheckNodeDiskPressure", "CheckNodeDiskPressure",
"CheckNodePIDPressure", "CheckNodePIDPressure",
@ -992,6 +1008,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"MostRequestedPriority", "MostRequestedPriority",
"RequestedToCapacityRatioPriority", "RequestedToCapacityRatioPriority",
), ),
wantFilterPlugins: sets.NewString(
"TaintToleration",
),
wantExtenders: []schedulerapi.ExtenderConfig{{ wantExtenders: []schedulerapi.ExtenderConfig{{
URLPrefix: "/prefix", URLPrefix: "/prefix",
FilterVerb: "filter", FilterVerb: "filter",
@ -1079,7 +1098,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"HostName", "HostName",
"NoDiskConflict", "NoDiskConflict",
"NoVolumeZoneConflict", "NoVolumeZoneConflict",
"PodToleratesNodeTaints",
"CheckNodeMemoryPressure", "CheckNodeMemoryPressure",
"CheckNodeDiskPressure", "CheckNodeDiskPressure",
"CheckNodePIDPressure", "CheckNodePIDPressure",
@ -1108,6 +1126,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"MostRequestedPriority", "MostRequestedPriority",
"RequestedToCapacityRatioPriority", "RequestedToCapacityRatioPriority",
), ),
wantFilterPlugins: sets.NewString(
"TaintToleration",
),
wantExtenders: []schedulerapi.ExtenderConfig{{ wantExtenders: []schedulerapi.ExtenderConfig{{
URLPrefix: "/prefix", URLPrefix: "/prefix",
FilterVerb: "filter", FilterVerb: "filter",
@ -1128,6 +1149,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
seenPredicates := sets.NewString() seenPredicates := sets.NewString()
seenPriorities := sets.NewString() seenPriorities := sets.NewString()
mandatoryPredicates := sets.NewString("CheckNodeCondition") mandatoryPredicates := sets.NewString("CheckNodeCondition")
filterToPredicateMap := map[string]string{
"TaintToleration": "PodToleratesNodeTaints",
}
for v, tc := range schedulerFiles { for v, tc := range schedulerFiles {
t.Run(v, func(t *testing.T) { t.Run(v, func(t *testing.T) {
@ -1163,26 +1187,39 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
algorithmSrc, algorithmSrc,
make(chan struct{}), make(chan struct{}),
) )
if err != nil { if err != nil {
t.Fatalf("%s: Error constructing: %v", v, err) t.Fatalf("%s: Error constructing: %v", v, err)
} }
schedPredicates := sets.NewString() gotPredicates := sets.NewString()
for p := range sched.Algorithm.Predicates() { for p := range sched.Algorithm.Predicates() {
schedPredicates.Insert(p) gotPredicates.Insert(p)
} }
wantPredicates := tc.wantPredicates.Union(mandatoryPredicates) wantPredicates := tc.wantPredicates.Union(mandatoryPredicates)
if !schedPredicates.Equal(wantPredicates) { if !gotPredicates.Equal(wantPredicates) {
t.Errorf("Got predicates %v, want %v", schedPredicates, wantPredicates) t.Errorf("Got predicates %v, want %v", gotPredicates, wantPredicates)
}
schedPrioritizers := sets.NewString()
for _, p := range sched.Algorithm.Prioritizers() {
schedPrioritizers.Insert(p.Name)
} }
if !schedPrioritizers.Equal(tc.wantPrioritizers) { gotPrioritizers := sets.NewString()
t.Errorf("Got prioritizers %v, want %v", schedPrioritizers, tc.wantPrioritizers) for _, p := range sched.Algorithm.Prioritizers() {
gotPrioritizers.Insert(p.Name)
} }
schedExtenders := sched.Algorithm.Extenders() if !gotPrioritizers.Equal(tc.wantPrioritizers) {
t.Errorf("Got prioritizers %v, want %v", gotPrioritizers, tc.wantPrioritizers)
}
gotFilterPlugins := sets.NewString()
plugins := sched.Framework.ListPlugins()
for _, p := range plugins["FilterPlugin"] {
gotFilterPlugins.Insert(p)
seenPredicates.Insert(filterToPredicateMap[p])
}
if !gotFilterPlugins.Equal(tc.wantFilterPlugins) {
t.Errorf("Got filter plugins %v, want %v", gotFilterPlugins, tc.wantFilterPlugins)
}
gotExtenders := sched.Algorithm.Extenders()
var wantExtenders []*core.HTTPExtender var wantExtenders []*core.HTTPExtender
for _, e := range tc.wantExtenders { for _, e := range tc.wantExtenders {
extender, err := core.NewHTTPExtender(&e) extender, err := core.NewHTTPExtender(&e)
@ -1191,13 +1228,14 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
} }
wantExtenders = append(wantExtenders, extender.(*core.HTTPExtender)) wantExtenders = append(wantExtenders, extender.(*core.HTTPExtender))
} }
for i := range schedExtenders { for i := range gotExtenders {
if !core.Equal(wantExtenders[i], schedExtenders[i].(*core.HTTPExtender)) { if !core.Equal(wantExtenders[i], gotExtenders[i].(*core.HTTPExtender)) {
t.Errorf("Got extender #%d %+v, want %+v", i, schedExtenders[i], wantExtenders[i]) t.Errorf("Got extender #%d %+v, want %+v", i, gotExtenders[i], wantExtenders[i])
} }
} }
seenPredicates = seenPredicates.Union(schedPredicates)
seenPriorities = seenPriorities.Union(schedPrioritizers) seenPredicates = seenPredicates.Union(gotPredicates)
seenPriorities = seenPriorities.Union(gotPrioritizers)
}) })
} }

View File

@ -65,27 +65,6 @@ const (
minFeasibleNodesPercentageToFind = 5 minFeasibleNodesPercentageToFind = 5
) )
var unresolvablePredicateFailureErrors = map[predicates.PredicateFailureReason]struct{}{
predicates.ErrNodeSelectorNotMatch: {},
predicates.ErrPodAffinityRulesNotMatch: {},
predicates.ErrPodNotMatchHostName: {},
predicates.ErrTaintsTolerationsNotMatch: {},
predicates.ErrNodeLabelPresenceViolated: {},
// Node conditions won't change when scheduler simulates removal of preemption victims.
// So, it is pointless to try nodes that have not been able to host the pod due to node
// conditions. These include ErrNodeNotReady, ErrNodeUnderPIDPressure, ErrNodeUnderMemoryPressure, ....
predicates.ErrNodeNotReady: {},
predicates.ErrNodeNetworkUnavailable: {},
predicates.ErrNodeUnderDiskPressure: {},
predicates.ErrNodeUnderPIDPressure: {},
predicates.ErrNodeUnderMemoryPressure: {},
predicates.ErrNodeUnschedulable: {},
predicates.ErrNodeUnknownCondition: {},
predicates.ErrVolumeZoneConflict: {},
predicates.ErrVolumeNodeConflict: {},
predicates.ErrVolumeBindConflict: {},
}
// FailedPredicateMap declares a map[string][]algorithm.PredicateFailureReason type. // FailedPredicateMap declares a map[string][]algorithm.PredicateFailureReason type.
type FailedPredicateMap map[string][]predicates.PredicateFailureReason type FailedPredicateMap map[string][]predicates.PredicateFailureReason
@ -1204,16 +1183,6 @@ func (g *genericScheduler) selectVictimsOnNode(
return victims, numViolatingVictim, true return victims, numViolatingVictim, true
} }
// unresolvablePredicateExists checks whether failedPredicates has unresolvable predicate.
func unresolvablePredicateExists(failedPredicates []predicates.PredicateFailureReason) bool {
for _, failedPredicate := range failedPredicates {
if _, ok := unresolvablePredicateFailureErrors[failedPredicate]; ok {
return true
}
}
return false
}
// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates // nodesWherePreemptionMightHelp returns a list of nodes with failed predicates
// that may be satisfied by removing pods from the node. // that may be satisfied by removing pods from the node.
func nodesWherePreemptionMightHelp(nodes []*v1.Node, fitErr *FitError) []*v1.Node { func nodesWherePreemptionMightHelp(nodes []*v1.Node, fitErr *FitError) []*v1.Node {
@ -1228,7 +1197,7 @@ func nodesWherePreemptionMightHelp(nodes []*v1.Node, fitErr *FitError) []*v1.Nod
// to rely less on such assumptions in the code when checking does not impose // to rely less on such assumptions in the code when checking does not impose
// significant overhead. // significant overhead.
// Also, we currently assume all failures returned by extender as resolvable. // Also, we currently assume all failures returned by extender as resolvable.
if !unresolvablePredicateExists(failedPredicates) { if predicates.UnresolvablePredicateExists(failedPredicates) == nil {
klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name) klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name)
potentialNodes = append(potentialNodes, node) potentialNodes = append(potentialNodes, node)
} }

View File

@ -6,8 +6,9 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins", importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins",
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/plugins/noop:go_default_library", "//pkg/scheduler/framework/plugins/tainttoleration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
], ],
) )
@ -24,7 +25,9 @@ filegroup(
srcs = [ srcs = [
":package-srcs", ":package-srcs",
"//pkg/scheduler/framework/plugins/examples:all-srcs", "//pkg/scheduler/framework/plugins/examples:all-srcs",
"//pkg/scheduler/framework/plugins/migration:all-srcs",
"//pkg/scheduler/framework/plugins/noop:all-srcs", "//pkg/scheduler/framework/plugins/noop:all-srcs",
"//pkg/scheduler/framework/plugins/tainttoleration:all-srcs",
], ],
tags = ["automanaged"], tags = ["automanaged"],
visibility = ["//visibility:public"], visibility = ["//visibility:public"],

View File

@ -19,8 +19,9 @@ package plugins
import ( import (
"fmt" "fmt"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
noop "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noop" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
) )
@ -29,9 +30,7 @@ import (
// runs custom plugins, can pass a different Registry when initializing the scheduler. // runs custom plugins, can pass a different Registry when initializing the scheduler.
func NewDefaultRegistry() framework.Registry { func NewDefaultRegistry() framework.Registry {
return framework.Registry{ return framework.Registry{
// This is just a test plugin to showcase the setup, it should be deleted once tainttoleration.Name: tainttoleration.New,
// we have at least one legitimate plugin here.
noop.Name: noop.New,
} }
} }
@ -55,10 +54,17 @@ type ConfigProducerRegistry struct {
// NewDefaultConfigProducerRegistry creates a new producer registry. // NewDefaultConfigProducerRegistry creates a new producer registry.
func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry { func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry {
return &ConfigProducerRegistry{ registry := &ConfigProducerRegistry{
PredicateToConfigProducer: make(map[string]ConfigProducer), PredicateToConfigProducer: make(map[string]ConfigProducer),
PriorityToConfigProducer: make(map[string]ConfigProducer), PriorityToConfigProducer: make(map[string]ConfigProducer),
} }
registry.RegisterPredicate(predicates.PodToleratesNodeTaintsPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, tainttoleration.Name, nil)
return
})
return registry
} }
func registerProducer(name string, producer ConfigProducer, producersMap map[string]ConfigProducer) error { func registerProducer(name string, producer ConfigProducer, producersMap map[string]ConfigProducer) error {
@ -78,3 +84,15 @@ func (f *ConfigProducerRegistry) RegisterPredicate(name string, producer ConfigP
func (f *ConfigProducerRegistry) RegisterPriority(name string, producer ConfigProducer) error { func (f *ConfigProducerRegistry) RegisterPriority(name string, producer ConfigProducer) error {
return registerProducer(name, producer, f.PriorityToConfigProducer) return registerProducer(name, producer, f.PriorityToConfigProducer)
} }
func appendToPluginSet(set *config.PluginSet, name string, weight *int32) *config.PluginSet {
if set == nil {
set = &config.PluginSet{}
}
cfg := config.Plugin{Name: name}
if weight != nil {
cfg.Weight = *weight
}
set.Enabled = append(set.Enabled, cfg)
return set
}

View File

@ -25,18 +25,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
) )
func appendToPluginSet(pluginSet *config.PluginSet, name string, weight *int32) *config.PluginSet {
if pluginSet == nil {
pluginSet = &config.PluginSet{}
}
config := config.Plugin{Name: name}
if weight != nil {
config.Weight = *weight
}
pluginSet.Enabled = append(pluginSet.Enabled, config)
return pluginSet
}
func produceConfig(keys []string, producersMap map[string]ConfigProducer, args ConfigProducerArgs) (*config.Plugins, []config.PluginConfig, error) { func produceConfig(keys []string, producersMap map[string]ConfigProducer, args ConfigProducerArgs) (*config.Plugins, []config.PluginConfig, error) {
var plugins config.Plugins var plugins config.Plugins
var pluginConfig []config.PluginConfig var pluginConfig []config.PluginConfig

View File

@ -0,0 +1,36 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["utils.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = ["utils_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
],
)

View File

@ -0,0 +1,49 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package migration
import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
// PredicateResultToFrameworkStatus converts a predicate result (PredicateFailureReason + error)
// to a framework status.
func PredicateResultToFrameworkStatus(reasons []predicates.PredicateFailureReason, err error) *framework.Status {
if s := ErrorToFrameworkStatus(err); s != nil {
return s
}
if len(reasons) == 0 {
return nil
}
if r := predicates.UnresolvablePredicateExists(reasons); r != nil {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, r.GetReason())
}
// We will just use the first reason.
return framework.NewStatus(framework.Unschedulable, reasons[0].GetReason())
}
// ErrorToFrameworkStatus converts an error to a framework status.
func ErrorToFrameworkStatus(err error) *framework.Status {
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
return nil
}

View File

@ -0,0 +1,68 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package migration
import (
"errors"
"reflect"
"testing"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
func TestPredicateResultToFrameworkStatus(t *testing.T) {
tests := []struct {
name string
err error
reasons []predicates.PredicateFailureReason
wantStatus *framework.Status
}{
{
name: "Success",
},
{
name: "Error",
err: errors.New("Failed with error"),
wantStatus: framework.NewStatus(framework.Error, "Failed with error"),
},
{
name: "Error with reason",
err: errors.New("Failed with error"),
reasons: []predicates.PredicateFailureReason{predicates.ErrDiskConflict},
wantStatus: framework.NewStatus(framework.Error, "Failed with error"),
},
{
name: "Unschedulable",
reasons: []predicates.PredicateFailureReason{predicates.ErrExistingPodsAntiAffinityRulesNotMatch},
wantStatus: framework.NewStatus(framework.Unschedulable, "node(s) didn't satisfy existing pods anti-affinity rules"),
},
{
name: "Unschedulable and Unresolvable",
reasons: []predicates.PredicateFailureReason{predicates.ErrDiskConflict, predicates.ErrNodeSelectorNotMatch},
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't match node selector"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotStatus := PredicateResultToFrameworkStatus(tt.reasons, tt.err)
if !reflect.DeepEqual(tt.wantStatus, gotStatus) {
t.Errorf("Got status %v, want %v", gotStatus, tt.wantStatus)
}
})
}
}

View File

@ -0,0 +1,30 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["taint_toleration.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,50 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tainttoleration
import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
// TaintToleration is a plugin that checks if a pod tolerates a node's taints.
type TaintToleration struct{}
var _ = framework.FilterPlugin(&TaintToleration{})
// Name is the name of the plugin used in the plugin registry and configurations.
const Name = "TaintToleration"
// Name returns name of the plugin. It is used in logs, etc.
func (pl *TaintToleration) Name() string {
return Name
}
// Filter invoked at the filter extension point.
func (pl *TaintToleration) Filter(_ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
_, reasons, err := predicates.PodToleratesNodeTaints(pod, nil, nodeInfo)
return migration.PredicateResultToFrameworkStatus(reasons, err)
}
// New initializes a new plugin and returns it.
func New(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return &TaintToleration{}, nil
}

View File

@ -33,6 +33,11 @@ import (
schedutil "k8s.io/kubernetes/pkg/scheduler/util" schedutil "k8s.io/kubernetes/pkg/scheduler/util"
) )
const (
// Specifies the maximum timeout a permit plugin can return.
maxTimeout time.Duration = 15 * time.Minute
)
// framework is the component responsible for initializing and running scheduler // framework is the component responsible for initializing and running scheduler
// plugins. // plugins.
type framework struct { type framework struct {
@ -53,10 +58,32 @@ type framework struct {
permitPlugins []PermitPlugin permitPlugins []PermitPlugin
} }
const ( // extensionPoint encapsulates desired and applied set of plugins at a specific extension
// Specifies the maximum timeout a permit plugin can return. // point. This is used to simplify iterating over all extension points supported by the
maxTimeout time.Duration = 15 * time.Minute // framework.
) type extensionPoint struct {
// the set of plugins to be configured at this extension point.
plugins *config.PluginSet
// a pointer to the slice storing plugins implementations that will run at this
// extenstion point.
slicePtr interface{}
}
func (f *framework) getExtensionPoints(plugins *config.Plugins) []extensionPoint {
return []extensionPoint{
{plugins.PreFilter, &f.preFilterPlugins},
{plugins.Filter, &f.filterPlugins},
{plugins.Reserve, &f.reservePlugins},
{plugins.PostFilter, &f.postFilterPlugins},
{plugins.Score, &f.scorePlugins},
{plugins.PreBind, &f.preBindPlugins},
{plugins.Bind, &f.bindPlugins},
{plugins.PostBind, &f.postBindPlugins},
{plugins.Unreserve, &f.unreservePlugins},
{plugins.Permit, &f.permitPlugins},
{plugins.QueueSort, &f.queueSortPlugins},
}
}
var _ = Framework(&framework{}) var _ = Framework(&framework{})
@ -73,29 +100,30 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
} }
// get needed plugins from config // get needed plugins from config
pg := pluginsNeeded(plugins) pg := f.pluginsNeeded(plugins)
if len(pg) == 0 { if len(pg) == 0 {
return f, nil return f, nil
} }
pluginConfig := pluginNameToConfig(args) pluginConfig := make(map[string]*runtime.Unknown, 0)
for i := range args {
pluginConfig[args[i].Name] = &args[i].Args
}
pluginsMap := make(map[string]Plugin) pluginsMap := make(map[string]Plugin)
for name, factory := range r { for name, factory := range r {
// initialize only needed plugins // initialize only needed plugins.
if _, ok := pg[name]; !ok { if _, ok := pg[name]; !ok {
continue continue
} }
// find the config args of a plugin p, err := factory(pluginConfig[name], f)
state := pluginConfig[name]
p, err := factory(state, f)
if err != nil { if err != nil {
return nil, fmt.Errorf("error initializing plugin %q: %v", name, err) return nil, fmt.Errorf("error initializing plugin %q: %v", name, err)
} }
pluginsMap[name] = p pluginsMap[name] = p
// A weight of zero is not permitted, plugins can be disabled explicitly // a weight of zero is not permitted, plugins can be disabled explicitly
// when configured. // when configured.
f.pluginNameToWeightMap[name] = int(pg[name].Weight) f.pluginNameToWeightMap[name] = int(pg[name].Weight)
if f.pluginNameToWeightMap[name] == 0 { if f.pluginNameToWeightMap[name] == 0 {
@ -103,50 +131,14 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
} }
} }
if err := updatePluginList(&f.preFilterPlugins, plugins.PreFilter, pluginsMap); err != nil { for _, e := range f.getExtensionPoints(plugins) {
return nil, err if err := updatePluginList(e.slicePtr, e.plugins, pluginsMap); err != nil {
} return nil, err
}
if err := updatePluginList(&f.filterPlugins, plugins.Filter, pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(&f.reservePlugins, plugins.Reserve, pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(&f.postFilterPlugins, plugins.PostFilter, pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(&f.scorePlugins, plugins.Score, pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(&f.preBindPlugins, plugins.PreBind, pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(&f.bindPlugins, plugins.Bind, pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(&f.postBindPlugins, plugins.PostBind, pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(&f.unreservePlugins, plugins.Unreserve, pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(&f.permitPlugins, plugins.Permit, pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(&f.queueSortPlugins, plugins.QueueSort, pluginsMap); err != nil {
return nil, err
} }
// Verifying the score weights again since Plugin.Name() could return a different
// value from the one used in the configuration.
for _, scorePlugin := range f.scorePlugins { for _, scorePlugin := range f.scorePlugins {
if f.pluginNameToWeightMap[scorePlugin.Name()] == 0 { if f.pluginNameToWeightMap[scorePlugin.Name()] == 0 {
return nil, fmt.Errorf("score plugin %q is not configured with weight", scorePlugin.Name()) return nil, fmt.Errorf("score plugin %q is not configured with weight", scorePlugin.Name())
@ -171,15 +163,15 @@ func updatePluginList(pluginList interface{}, pluginSet *config.PluginSet, plugi
for _, ep := range pluginSet.Enabled { for _, ep := range pluginSet.Enabled {
pg, ok := pluginsMap[ep.Name] pg, ok := pluginsMap[ep.Name]
if !ok { if !ok {
return fmt.Errorf("%s %q does not exist", pluginType.String(), ep.Name) return fmt.Errorf("%s %q does not exist", pluginType.Name(), ep.Name)
} }
if !reflect.TypeOf(pg).Implements(pluginType) { if !reflect.TypeOf(pg).Implements(pluginType) {
return fmt.Errorf("plugin %q does not extend %s plugin", ep.Name, pluginType.String()) return fmt.Errorf("plugin %q does not extend %s plugin", ep.Name, pluginType.Name())
} }
if set.Has(ep.Name) { if set.Has(ep.Name) {
return fmt.Errorf("plugin %q already registered as %q", ep.Name, pluginType.String()) return fmt.Errorf("plugin %q already registered as %q", ep.Name, pluginType.Name())
} }
set.Insert(ep.Name) set.Insert(ep.Name)
@ -534,17 +526,33 @@ func (f *framework) GetWaitingPod(uid types.UID) WaitingPod {
return f.waitingPods.get(uid) return f.waitingPods.get(uid)
} }
func pluginNameToConfig(args []config.PluginConfig) map[string]*runtime.Unknown { // ListPlugins returns a map of extension point name to plugin names configured at each extension
state := make(map[string]*runtime.Unknown, 0) // point. Returns nil if no plugins where configred.
for i := range args { func (f *framework) ListPlugins() map[string][]string {
// This is needed because the type of PluginConfig.Args is not pointer type. m := make(map[string][]string)
p := args[i]
state[p.Name] = &p.Args insert := func(ptr interface{}) {
plugins := reflect.ValueOf(ptr).Elem()
var names []string
for i := 0; i < plugins.Len(); i++ {
name := plugins.Index(i).Interface().(Plugin).Name()
names = append(names, name)
}
if len(names) > 0 {
extName := plugins.Type().Elem().Name()
m[extName] = names
}
} }
return state for _, e := range f.getExtensionPoints(&config.Plugins{}) {
insert(e.slicePtr)
}
if len(m) > 0 {
return m
}
return nil
} }
func pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin { func (f *framework) pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin {
pgMap := make(map[string]config.Plugin, 0) pgMap := make(map[string]config.Plugin, 0)
if plugins == nil { if plugins == nil {
@ -559,17 +567,8 @@ func pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin {
pgMap[pg.Name] = pg pgMap[pg.Name] = pg
} }
} }
find(plugins.QueueSort) for _, e := range f.getExtensionPoints(plugins) {
find(plugins.PreFilter) find(e.plugins)
find(plugins.Filter) }
find(plugins.PostFilter)
find(plugins.Score)
find(plugins.Reserve)
find(plugins.Permit)
find(plugins.PreBind)
find(plugins.Bind)
find(plugins.PostBind)
find(plugins.Unreserve)
return pgMap return pgMap
} }

View File

@ -405,6 +405,10 @@ type Framework interface {
// or "Success". If none of the plugins handled binding, RunBindPlugins returns // or "Success". If none of the plugins handled binding, RunBindPlugins returns
// code=4("skip") status. // code=4("skip") status.
RunBindPlugins(state *CycleState, pod *v1.Pod, nodeName string) *Status RunBindPlugins(state *CycleState, pod *v1.Pod, nodeName string) *Status
// ListPlugins returns a map of extension point name to plugin names
// configured at each extension point.
ListPlugins() map[string][]string
} }
// FrameworkHandle provides data and some tools that plugins can use. It is // FrameworkHandle provides data and some tools that plugins can use. It is

View File

@ -163,6 +163,10 @@ func (*fakeFramework) QueueSortFunc() framework.LessFunc {
} }
} }
func (f *fakeFramework) ListPlugins() map[string][]string {
return nil
}
func (*fakeFramework) NodeInfoSnapshot() *schedulernodeinfo.Snapshot { func (*fakeFramework) NodeInfoSnapshot() *schedulernodeinfo.Snapshot {
return nil return nil
} }

View File

@ -66,6 +66,7 @@ go_test(
"//test/integration/framework:go_default_library", "//test/integration/framework:go_default_library",
"//test/utils:go_default_library", "//test/utils:go_default_library",
"//test/utils/image:go_default_library", "//test/utils/image:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
"//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/klog:go_default_library",
], ],
) )

View File

@ -899,7 +899,8 @@ func TestBindPlugin(t *testing.T) {
context := initTestSchedulerWithOptions(t, testContext, false, nil, time.Second, context := initTestSchedulerWithOptions(t, testContext, false, nil, time.Second,
scheduler.WithFrameworkPlugins(plugins), scheduler.WithFrameworkPlugins(plugins),
scheduler.WithFrameworkPluginConfig(pluginConfig), scheduler.WithFrameworkPluginConfig(pluginConfig),
scheduler.WithFrameworkRegistry(registry)) scheduler.WithFrameworkRegistry(registry),
scheduler.WithFrameworkConfigProducerRegistry(nil))
defer cleanupTest(t, context) defer cleanupTest(t, context)
// Add a few nodes. // Add a few nodes.

View File

@ -23,6 +23,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -93,6 +95,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
policy string policy string
expectedPredicates sets.String expectedPredicates sets.String
expectedPrioritizers sets.String expectedPrioritizers sets.String
expectedPlugins map[string][]string
}{ }{
{ {
policy: `{ policy: `{
@ -136,7 +139,6 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
"MaxGCEPDVolumeCount", "MaxGCEPDVolumeCount",
"NoDiskConflict", "NoDiskConflict",
"NoVolumeZoneConflict", "NoVolumeZoneConflict",
"PodToleratesNodeTaints",
), ),
expectedPrioritizers: sets.NewString( expectedPrioritizers: sets.NewString(
"BalancedResourceAllocation", "BalancedResourceAllocation",
@ -148,6 +150,9 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
"TaintTolerationPriority", "TaintTolerationPriority",
"ImageLocalityPriority", "ImageLocalityPriority",
), ),
expectedPlugins: map[string][]string{
"FilterPlugin": {"TaintToleration"},
},
}, },
{ {
policy: `{ policy: `{
@ -201,7 +206,6 @@ kind: Policy
"MaxGCEPDVolumeCount", "MaxGCEPDVolumeCount",
"NoDiskConflict", "NoDiskConflict",
"NoVolumeZoneConflict", "NoVolumeZoneConflict",
"PodToleratesNodeTaints",
), ),
expectedPrioritizers: sets.NewString( expectedPrioritizers: sets.NewString(
"BalancedResourceAllocation", "BalancedResourceAllocation",
@ -213,6 +217,9 @@ kind: Policy
"TaintTolerationPriority", "TaintTolerationPriority",
"ImageLocalityPriority", "ImageLocalityPriority",
), ),
expectedPlugins: map[string][]string{
"FilterPlugin": {"TaintToleration"},
},
}, },
{ {
policy: `apiVersion: v1 policy: `apiVersion: v1
@ -287,6 +294,10 @@ priorities: []
if !schedPrioritizers.Equal(test.expectedPrioritizers) { if !schedPrioritizers.Equal(test.expectedPrioritizers) {
t.Errorf("Expected priority functions %v, got %v", test.expectedPrioritizers, schedPrioritizers) t.Errorf("Expected priority functions %v, got %v", test.expectedPrioritizers, schedPrioritizers)
} }
schedPlugins := sched.Framework.ListPlugins()
if diff := cmp.Diff(test.expectedPlugins, schedPlugins); diff != "" {
t.Errorf("unexpected predicates diff (-want, +got): %s", diff)
}
} }
} }