diff --git a/pkg/scheduler/core/BUILD b/pkg/scheduler/core/BUILD index 0f7b3dbef70..b4274a22ffe 100644 --- a/pkg/scheduler/core/BUILD +++ b/pkg/scheduler/core/BUILD @@ -59,6 +59,7 @@ go_test( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 4cd6ade7570..ac261671857 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -29,7 +29,7 @@ import ( "k8s.io/klog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -89,9 +89,10 @@ type FailedPredicateMap map[string][]predicates.PredicateFailureReason // FitError describes a fit error of a pod. type FitError struct { - Pod *v1.Pod - NumAllNodes int - FailedPredicates FailedPredicateMap + Pod *v1.Pod + NumAllNodes int + FailedPredicates FailedPredicateMap + FilteredNodesStatuses framework.NodeToStatusMap } // ErrNoNodesAvailable is used to describe the error that no nodes available to schedule pods. @@ -111,6 +112,10 @@ func (f *FitError) Error() string { } } + for _, status := range f.FilteredNodesStatuses { + reasons[status.Message()]++ + } + sortReasonsHistogram := func() []string { reasonStrings := []string{} for k, v := range reasons { @@ -206,16 +211,23 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister trace.Step("Basic checks done") startPredicateEvalTime := time.Now() - filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pluginContext, pod, nodeLister) + filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(pluginContext, pod, nodeLister) if err != nil { return result, err } + // Run "postfilter" plugins. + postfilterStatus := g.framework.RunPostFilterPlugins(pluginContext, pod, filteredNodes, filteredNodesStatuses) + if !postfilterStatus.IsSuccess() { + return result, postfilterStatus.AsError() + } + if len(filteredNodes) == 0 { return result, &FitError{ - Pod: pod, - NumAllNodes: numNodes, - FailedPredicates: failedPredicateMap, + Pod: pod, + NumAllNodes: numNodes, + FailedPredicates: failedPredicateMap, + FilteredNodesStatuses: filteredNodesStatuses, } } trace.Step("Computing predicates done") @@ -449,9 +461,10 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i // Filters the nodes to find the ones that fit based on the given predicate functions // Each node is passed through the predicate functions to determine if it is a fit -func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginContext, pod *v1.Pod, nodeLister algorithm.NodeLister) ([]*v1.Node, FailedPredicateMap, error) { +func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginContext, pod *v1.Pod, nodeLister algorithm.NodeLister) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) { var filtered []*v1.Node failedPredicateMap := FailedPredicateMap{} + filteredNodesStatuses := framework.NodeToStatusMap{} if len(g.predicates) == 0 { filtered = nodeLister.ListNodes() @@ -495,8 +508,7 @@ func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginConte status := g.framework.RunFilterPlugins(pluginContext, pod, nodeName) if !status.IsSuccess() { predicateResultLock.Lock() - failedPredicateMap[nodeName] = append(failedPredicateMap[nodeName], - predicates.NewFailureReason(status.Message())) + filteredNodesStatuses[nodeName] = status if status.Code() != framework.Unschedulable { errs[status.Message()]++ } @@ -524,7 +536,7 @@ func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginConte filtered = filtered[:filteredLen] if len(errs) > 0 { - return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs) + return []*v1.Node{}, FailedPredicateMap{}, framework.NodeToStatusMap{}, errors.CreateAggregateFromMessageCountMap(errs) } } @@ -539,9 +551,9 @@ func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginConte klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set", extender, err) continue - } else { - return []*v1.Node{}, FailedPredicateMap{}, err } + + return []*v1.Node{}, FailedPredicateMap{}, framework.NodeToStatusMap{}, err } for failedNodeName, failedMsg := range failedMap { @@ -556,7 +568,7 @@ func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginConte } } } - return filtered, failedPredicateMap, nil + return filtered, failedPredicateMap, filteredNodesStatuses, nil } // addNominatedPods adds pods with equal or greater priority which are nominated diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index f832face87b..916181e5909 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -26,9 +26,10 @@ import ( "time" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" @@ -139,6 +140,42 @@ func getNodeReducePriority(pod *v1.Pod, meta interface{}, nodeNameToInfo map[str var EmptyPluginRegistry = framework.Registry{} var emptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil, []schedulerconfig.PluginConfig{}) +// FakeFilterPlugin is a test filter plugin used by default scheduler. +type FakeFilterPlugin struct { + numFilterCalled int + failFilter bool +} + +var filterPlugin = &FakeFilterPlugin{} + +// Name returns name of the plugin. +func (fp *FakeFilterPlugin) Name() string { + return "fake-filter-plugin" +} + +// reset is used to reset filter plugin. +func (fp *FakeFilterPlugin) reset() { + fp.numFilterCalled = 0 + fp.failFilter = false +} + +// Filter is a test function that returns an error or nil, depending on the +// value of "failFilter". +func (fp *FakeFilterPlugin) Filter(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status { + fp.numFilterCalled++ + + if fp.failFilter { + return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("injecting failure for pod %v", pod.Name)) + } + + return nil +} + +// NewFilterPlugin is the factory for filtler plugin. +func NewFilterPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { + return filterPlugin, nil +} + func makeNodeList(nodeNames []string) []*v1.Node { result := make([]*v1.Node, 0, len(nodeNames)) for _, nodeName := range nodeNames { @@ -219,6 +256,21 @@ func TestSelectHost(t *testing.T) { func TestGenericScheduler(t *testing.T) { defer algorithmpredicates.SetPredicatesOrderingDuringTest(order)() + + filterPluginRegistry := framework.Registry{filterPlugin.Name(): NewFilterPlugin} + filterFramework, err := framework.NewFramework(filterPluginRegistry, &schedulerconfig.Plugins{ + Filter: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: filterPlugin.Name(), + }, + }, + }, + }, []schedulerconfig.PluginConfig{}) + if err != nil { + t.Errorf("Unexpected error when initialize scheduling framework, err :%v", err.Error()) + } + tests := []struct { name string predicates map[string]algorithmpredicates.FitPredicate @@ -229,6 +281,7 @@ func TestGenericScheduler(t *testing.T) { pod *v1.Pod pods []*v1.Pod buildPredMeta bool // build predicates metadata or not + failFilter bool expectedHosts sets.String expectsErr bool wErr error @@ -246,7 +299,9 @@ func TestGenericScheduler(t *testing.T) { FailedPredicates: FailedPredicateMap{ "machine1": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, "machine2": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, - }}, + }, + FilteredNodesStatuses: framework.NodeToStatusMap{}, + }, }, { predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate}, @@ -309,6 +364,7 @@ func TestGenericScheduler(t *testing.T) { "2": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, "1": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, }, + FilteredNodesStatuses: framework.NodeToStatusMap{}, }, }, { @@ -339,6 +395,7 @@ func TestGenericScheduler(t *testing.T) { "1": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, "2": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, }, + FilteredNodesStatuses: framework.NodeToStatusMap{}, }, }, { @@ -426,6 +483,7 @@ func TestGenericScheduler(t *testing.T) { FailedPredicates: FailedPredicateMap{ "1": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate, algorithmpredicates.ErrFakePredicate}, }, + FilteredNodesStatuses: framework.NodeToStatusMap{}, }, }, { @@ -538,9 +596,29 @@ func TestGenericScheduler(t *testing.T) { expectedHosts: sets.NewString("machine2", "machine3"), wErr: nil, }, + { + name: "test with failed filter plugin", + predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate}, + prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}}, + nodes: []string{"3"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}}, + expectedHosts: nil, + failFilter: true, + expectsErr: true, + wErr: &FitError{ + Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}}, + NumAllNodes: 1, + FailedPredicates: FailedPredicateMap{}, + FilteredNodesStatuses: framework.NodeToStatusMap{ + "3": framework.NewStatus(framework.Unschedulable, "injecting failure for pod test-filter"), + }, + }, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + filterPlugin.failFilter = test.failFilter + cache := internalcache.New(time.Duration(0), wait.NeverStop) for _, pod := range test.pods { cache.AddPod(pod) @@ -564,7 +642,7 @@ func TestGenericScheduler(t *testing.T) { predMetaProducer, test.prioritizers, priorities.EmptyPriorityMetadataProducer, - emptyFramework, + filterFramework, []algorithm.SchedulerExtender{}, nil, pvcLister, @@ -575,11 +653,13 @@ func TestGenericScheduler(t *testing.T) { false) result, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)), framework.NewPluginContext()) if !reflect.DeepEqual(err, test.wErr) { - t.Errorf("Unexpected error: %v, expected: %v", err, test.wErr) + t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr) } if test.expectedHosts != nil && !test.expectedHosts.Has(result.SuggestedHost) { t.Errorf("Expected: %s, got: %s", test.expectedHosts, result.SuggestedHost) } + + filterPlugin.reset() }) } } @@ -613,7 +693,7 @@ func TestFindFitAllError(t *testing.T) { nodes := makeNodeList([]string{"3", "2", "1"}) scheduler := makeScheduler(predicates, nodes) - _, predicateMap, err := scheduler.findNodesThatFit(nil, &v1.Pod{}, schedulertesting.FakeNodeLister(nodes)) + _, predicateMap, _, err := scheduler.findNodesThatFit(nil, &v1.Pod{}, schedulertesting.FakeNodeLister(nodes)) if err != nil { t.Errorf("unexpected error: %v", err) @@ -643,7 +723,7 @@ func TestFindFitSomeError(t *testing.T) { scheduler := makeScheduler(predicates, nodes) pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}} - _, predicateMap, err := scheduler.findNodesThatFit(nil, pod, schedulertesting.FakeNodeLister(nodes)) + _, predicateMap, _, err := scheduler.findNodesThatFit(nil, pod, schedulertesting.FakeNodeLister(nodes)) if err != nil { t.Errorf("unexpected error: %v", err) diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index 16cd86e6e15..10606950c20 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -21,7 +21,7 @@ import ( "fmt" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" @@ -41,6 +41,7 @@ type framework struct { queueSortPlugins []QueueSortPlugin prefilterPlugins []PrefilterPlugin filterPlugins []FilterPlugin + postFilterPlugins []PostFilterPlugin scorePlugins []ScorePlugin scoreWithNormalizePlugins []ScoreWithNormalizePlugin reservePlugins []ReservePlugin @@ -168,6 +169,20 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi } } + if plugins.PostFilter != nil { + for _, r := range plugins.PostFilter.Enabled { + if pg, ok := pluginsMap[r.Name]; ok { + p, ok := pg.(PostFilterPlugin) + if !ok { + return nil, fmt.Errorf("plugin %v does not extend post-filter plugin", r.Name) + } + f.postFilterPlugins = append(f.postFilterPlugins, p) + } else { + return nil, fmt.Errorf("post-filter plugin %v does not exist", r.Name) + } + } + } + if plugins.PreBind != nil { for _, pb := range plugins.PreBind.Enabled { if pg, ok := pluginsMap[pb.Name]; ok { @@ -287,6 +302,7 @@ func (f *framework) RunPrefilterPlugins( return NewStatus(Error, msg) } } + return nil } @@ -296,13 +312,12 @@ func (f *framework) RunPrefilterPlugins( // Meanwhile, the failure message and status are set for the given node. func (f *framework) RunFilterPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status { - - for _, p := range f.filterPlugins { - status := p.Filter(pc, pod, nodeName) + for _, pl := range f.filterPlugins { + status := pl.Filter(pc, pod, nodeName) if !status.IsSuccess() { if status.Code() != Unschedulable { - errMsg := fmt.Sprintf("RunFilterPlugins: error while running %s filter plugin for pod %s: %s", - p.Name(), pod.Name, status.Message()) + errMsg := fmt.Sprintf("RunFilterPlugins: error while running %v filter plugin for pod %v: %v", + pl.Name(), pod.Name, status.Message()) klog.Error(errMsg) return NewStatus(Error, errMsg) } @@ -313,6 +328,27 @@ func (f *framework) RunFilterPlugins(pc *PluginContext, return nil } +// RunPostFilterPlugins runs the set of configured post-filter plugins. If any +// of these plugins returns any status other than "Success", the given node is +// rejected. The filteredNodeStatuses is the set of filtered nodes and their statuses. +func (f *framework) RunPostFilterPlugins( + pc *PluginContext, + pod *v1.Pod, + nodes []*v1.Node, + filteredNodesStatuses NodeToStatusMap, +) *Status { + for _, pl := range f.postFilterPlugins { + status := pl.PostFilter(pc, pod, nodes, filteredNodesStatuses) + if !status.IsSuccess() { + msg := fmt.Sprintf("error while running %v postfilter plugin for pod %v: %v", pl.Name(), pod.Name, status.Message()) + klog.Error(msg) + return NewStatus(Error, msg) + } + } + + return nil +} + // RunScorePlugins runs the set of configured scoring plugins. It returns a map that // stores for each scoring plugin name the corresponding NodeScoreList(s). // It also returns *Status, which is set to non-success if any of the plugins returns diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index 2a8a0bea524..ea47529a231 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -22,7 +22,7 @@ import ( "errors" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" ) @@ -36,6 +36,9 @@ type NodeScoreList []int // PluginToNodeScoreMap declares a map from plugin name to its NodeScoreList. type PluginToNodeScoreMap map[string]NodeScoreList +// NodeToStatusMap declares map from node name to its status. +type NodeToStatusMap map[string]*Status + // These are predefined codes used in a Status. const ( // Success means that plugin ran correctly and found pod schedulable. @@ -160,6 +163,19 @@ type FilterPlugin interface { Filter(pc *PluginContext, pod *v1.Pod, nodeName string) *Status } +// PostFilterPlugin is an interface for Post-filter plugin. Post-filter is an +// informational extension point. Plugins will be called with a list of nodes +// that passed the filtering phase. A plugin may use this data to update internal +// state or to generate logs/metrics. +type PostFilterPlugin interface { + Plugin + // PostFilter is called by the scheduling framework after a list of nodes + // passed the filtering phase. All postfilter plugins must return success or + // the pod will be rejected. The filteredNodesStatuses is the set of filtered nodes + // and their filter status. + PostFilter(pc *PluginContext, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status +} + // ScorePlugin is an interface that must be implemented by "score" plugins to rank // nodes that passed the filtering phase. type ScorePlugin interface { @@ -269,6 +285,11 @@ type Framework interface { // the given node is not suitable for running the pod. RunFilterPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status + // RunPostFilterPlugins runs the set of configured post-filter plugins. If any + // of these plugins returns any status other than "Success", the given node is + // rejected. The filteredNodeStatuses is the set of filtered nodes and their statuses. + RunPostFilterPlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status + // RunScorePlugins runs the set of configured scoring plugins. It returns a map that // stores for each scoring plugin name the corresponding NodeScoreList(s). // It also returns *Status, which is set to non-success if any of the plugins returns diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 0b48b3a5211..03eaee4b9c4 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -24,7 +24,7 @@ import ( "time" dto "github.com/prometheus/client_model/go" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" @@ -197,6 +197,10 @@ func (*fakeFramework) RunBindPlugins(pc *framework.PluginContext, pod *v1.Pod, n func (*fakeFramework) RunPostbindPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) {} +func (*fakeFramework) RunPostFilterPlugins(pc *framework.PluginContext, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses framework.NodeToStatusMap) *framework.Status { + return nil +} + func (*fakeFramework) RunReservePlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status { return nil } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 6acf45cc01f..6225d4d157e 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -26,7 +26,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/api/events/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -415,9 +415,12 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { select { case err := <-errChan: expectErr := &core.FitError{ - Pod: secondPod, - NumAllNodes: 1, - FailedPredicates: core.FailedPredicateMap{node.Name: []predicates.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}}, + Pod: secondPod, + NumAllNodes: 1, + FailedPredicates: core.FailedPredicateMap{ + node.Name: []predicates.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, + }, + FilteredNodesStatuses: framework.NodeToStatusMap{}, } if !reflect.DeepEqual(expectErr, err) { t.Errorf("err want=%v, get=%v", expectErr, err) @@ -620,9 +623,10 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { select { case err := <-errChan: expectErr := &core.FitError{ - Pod: podWithTooBigResourceRequests, - NumAllNodes: len(nodes), - FailedPredicates: failedPredicatesMap, + Pod: podWithTooBigResourceRequests, + NumAllNodes: len(nodes), + FailedPredicates: failedPredicatesMap, + FilteredNodesStatuses: framework.NodeToStatusMap{}, } if len(fmt.Sprint(expectErr)) > 150 { t.Errorf("message is too spammy ! %v ", len(fmt.Sprint(expectErr))) diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index befc4672d31..1befd028b3b 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -21,7 +21,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -58,6 +58,11 @@ type ReservePlugin struct { failReserve bool } +type PostFilterPlugin struct { + numPostFilterCalled int + failPostFilter bool +} + type PrebindPlugin struct { numPrebindCalled int failPrebind bool @@ -100,6 +105,7 @@ const ( scorePluginName = "score-plugin" scoreWithNormalizePluginName = "score-with-normalize-plugin" filterPluginName = "filter-plugin" + postFilterPluginName = "postfilter-plugin" reservePluginName = "reserve-plugin" prebindPluginName = "prebind-plugin" unreservePluginName = "unreserve-plugin" @@ -113,6 +119,7 @@ var _ = framework.FilterPlugin(&FilterPlugin{}) var _ = framework.ScorePlugin(&ScorePlugin{}) var _ = framework.ScoreWithNormalizePlugin(&ScoreWithNormalizePlugin{}) var _ = framework.ReservePlugin(&ReservePlugin{}) +var _ = framework.PostFilterPlugin(&PostFilterPlugin{}) var _ = framework.PrebindPlugin(&PrebindPlugin{}) var _ = framework.BindPlugin(&BindPlugin{}) var _ = framework.PostbindPlugin(&PostbindPlugin{}) @@ -241,6 +248,34 @@ func NewReservePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framewor return resPlugin, nil } +// Name returns name of the plugin. +func (*PostFilterPlugin) Name() string { + return postFilterPluginName +} + +var postFilterPlugin = &PostFilterPlugin{} + +// PostFilter is a test function. +func (pfp *PostFilterPlugin) PostFilter(_ *framework.PluginContext, pod *v1.Pod, _ []*v1.Node, _ framework.NodeToStatusMap) *framework.Status { + pfp.numPostFilterCalled++ + if pfp.failPostFilter { + return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) + } + + return nil +} + +// reset used to reset postfilter plugin. +func (pfp *PostFilterPlugin) reset() { + pfp.numPostFilterCalled = 0 + pfp.failPostFilter = false +} + +// NewPostFilterPlugin is the factory for post-filter plugin. +func NewPostFilterPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { + return postFilterPlugin, nil +} + var pbdPlugin = &PrebindPlugin{} // Name returns name of the plugin. @@ -1447,6 +1482,65 @@ func TestFilterPlugin(t *testing.T) { } } +// TestPostFilterPlugin tests invocation of post-filter plugins. +func TestPostFilterPlugin(t *testing.T) { + // Create a plugin registry for testing. Register only a post-filter plugin. + registry := framework.Registry{postFilterPluginName: NewPostFilterPlugin} + + // Setup initial post-filter plugin for testing. + pluginsConfig := &schedulerconfig.Plugins{ + PostFilter: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: postFilterPluginName, + }, + }, + }, + } + // Set empty plugin config for testing + emptyPluginConfig := []schedulerconfig.PluginConfig{} + + // Create the master and the scheduler with the test plugin set. + context := initTestSchedulerWithOptions(t, + initTestMaster(t, "post-filter-plugin", nil), + false, nil, registry, pluginsConfig, emptyPluginConfig, false, time.Second) + defer cleanupTest(t, context) + + cs := context.clientSet + // Add a few nodes. + _, err := createNodes(cs, "test-node", nil, 2) + if err != nil { + t.Fatalf("Cannot create nodes: %v", err) + } + + for _, fail := range []bool{false, true} { + postFilterPlugin.failPostFilter = fail + // Create a best effort pod. + pod, err := createPausePod(cs, + initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name})) + if err != nil { + t.Errorf("Error while creating a test pod: %v", err) + } + + if fail { + if err = waitForPodUnschedulable(cs, pod); err != nil { + t.Errorf("Didn't expect the pod to be scheduled. error: %v", err) + } + } else { + if err = waitForPodToSchedule(cs, pod); err != nil { + t.Errorf("Expected the pod to be scheduled. error: %v", err) + } + } + + if postFilterPlugin.numPostFilterCalled == 0 { + t.Errorf("Expected the post-filter plugin to be called.") + } + + postFilterPlugin.reset() + cleanupPods(cs, t, []*v1.Pod{pod}) + } +} + // TestPreemptWithPermitPlugin tests preempt with permit plugins. func TestPreemptWithPermitPlugin(t *testing.T) { // Create a plugin registry for testing. Register only a permit plugin.