diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index fe44f911dd2..05bf8489746 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -209,7 +209,7 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister trace.Step("Basic checks done") startPredicateEvalTime := time.Now() - filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes) + filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pluginContext, pod, nodes) if err != nil { return result, err } @@ -460,7 +460,7 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i // Filters the nodes to find the ones that fit based on the given predicate functions // Each node is passed through the predicate functions to determine if it is a fit -func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) { +func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginContext, pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) { var filtered []*v1.Node failedPredicateMap := FailedPredicateMap{} @@ -486,6 +486,7 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v checkNode := func(i int) { nodeName := g.cache.NodeTree().Next() + fits, failedPredicates, err := podFitsOnNode( pod, meta, @@ -501,6 +502,19 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v return } if fits { + // Iterate each plugin to verify current node + status := g.framework.RunFilterPlugins(pluginContext, pod, nodeName) + if !status.IsSuccess() { + predicateResultLock.Lock() + failedPredicateMap[nodeName] = append(failedPredicateMap[nodeName], + predicates.NewFailureReason(status.Message())) + if status.Code() != framework.Unschedulable { + errs[status.Message()]++ + } + predicateResultLock.Unlock() + return + } + length := atomic.AddInt32(&filteredLen, 1) if length > numNodesToFind { cancel() diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index ff2b208f794..213a52acac9 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -506,7 +506,7 @@ func TestFindFitAllError(t *testing.T) { nodes := makeNodeList([]string{"3", "2", "1"}) scheduler := makeScheduler(predicates, nodes) - _, predicateMap, err := scheduler.findNodesThatFit(&v1.Pod{}, nodes) + _, predicateMap, err := scheduler.findNodesThatFit(nil, &v1.Pod{}, nodes) if err != nil { t.Errorf("unexpected error: %v", err) @@ -536,7 +536,7 @@ func TestFindFitSomeError(t *testing.T) { scheduler := makeScheduler(predicates, nodes) pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}} - _, predicateMap, err := scheduler.findNodesThatFit(pod, nodes) + _, predicateMap, err := scheduler.findNodesThatFit(nil, pod, nodes) if err != nil { t.Errorf("unexpected error: %v", err) diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index 7dce3761a13..4484857ebce 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -40,6 +40,7 @@ type framework struct { pluginNameToWeightMap map[string]int queueSortPlugins []QueueSortPlugin prefilterPlugins []PrefilterPlugin + filterPlugins []FilterPlugin scorePlugins []ScorePlugin reservePlugins []ReservePlugin prebindPlugins []PrebindPlugin @@ -113,6 +114,20 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi } } + if plugins.Filter != nil { + for _, r := range plugins.Filter.Enabled { + if pg, ok := pluginsMap[r.Name]; ok { + p, ok := pg.(FilterPlugin) + if !ok { + return nil, fmt.Errorf("plugin %v does not extend filter plugin", r.Name) + } + f.filterPlugins = append(f.filterPlugins, p) + } else { + return nil, fmt.Errorf("filter plugin %v does not exist", r.Name) + } + } + } + if plugins.Score != nil { for _, sc := range plugins.Score.Enabled { if pg, ok := pluginsMap[sc.Name]; ok { @@ -263,6 +278,29 @@ func (f *framework) RunPrefilterPlugins( return nil } +// RunFilterPlugins runs the set of configured Filter plugins for pod on +// the given node. If any of these plugins doesn't return "Success", the +// given node is not suitable for running pod. +// Meanwhile, the failure message and status are set for the given node. +func (f *framework) RunFilterPlugins(pc *PluginContext, + pod *v1.Pod, nodeName string) *Status { + + for _, p := range f.filterPlugins { + status := p.Filter(pc, pod, nodeName) + if !status.IsSuccess() { + if status.Code() != Unschedulable { + errMsg := fmt.Sprintf("RunFilterPlugins: error while running %s filter plugin for pod %s: %s", + p.Name(), pod.Name, status.Message()) + klog.Error(errMsg) + return NewStatus(Error, errMsg) + } + return status + } + } + + return nil +} + // RunScorePlugins runs the set of configured scoring plugins. It returns a map that // stores for each scoring plugin name the corresponding NodeScoreList(s). // It also returns *Status, which is set to non-success if any of the plugins returns diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index 1a8e647335a..31e4f887883 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -143,6 +143,23 @@ type PrefilterPlugin interface { Prefilter(pc *PluginContext, p *v1.Pod) *Status } +// FilterPlugin is an interface for Filter plugins. These plugins are called at the +// filter extension point for filtering out hosts that cannot run a pod. +// This concept used to be called 'predicate' in the original scheduler. +// These plugins should return "Success", "Unschedulable" or "Error" in Status.code. +// However, the scheduler accepts other valid codes as well. +// Anything other than "Success" will lead to exclusion of the given host from +// running the pod. +type FilterPlugin interface { + Plugin + // Filter is called by the scheduling framework. + // All FilterPlugins should return "Success" to declare that + // the given node fits the pod. If Filter doesn't return "Success", + // please refer scheduler/algorithm/predicates/error.go + // to set error message. + Filter(pc *PluginContext, pod *v1.Pod, nodeName string) *Status +} + // ScorePlugin is an interface that must be implemented by "score" plugins to rank // nodes that passed the filtering phase. type ScorePlugin interface { @@ -236,6 +253,11 @@ type Framework interface { // cycle is aborted. RunPrefilterPlugins(pc *PluginContext, pod *v1.Pod) *Status + // RunFilterPlugins runs the set of configured filter plugins for pod on the + // given host. If any of these plugins returns any status other than "Success", + // the given node is not suitable for running the pod. + RunFilterPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status + // RunScorePlugins runs the set of configured scoring plugins. It returns a map that // stores for each scoring plugin name the corresponding NodeScoreList(s). // It also returns *Status, which is set to non-success if any of the plugins returns diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index e46d4a26205..6740265e7b2 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -171,6 +171,10 @@ func (*fakeFramework) RunPrefilterPlugins(pc *framework.PluginContext, pod *v1.P return nil } +func (*fakeFramework) RunFilterPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status { + return nil +} + func (*fakeFramework) RunScorePlugins(pc *framework.PluginContext, pod *v1.Pod, nodes []*v1.Node) (framework.PluginToNodeScoreMap, *framework.Status) { return nil, nil } diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index 910efafccb3..0dd08b6e9c2 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -42,6 +42,11 @@ type ScorePlugin struct { highScoreNode string } +type FilterPlugin struct { + numFilterCalled int + failFilter bool +} + type ReservePlugin struct { numReserveCalled int failReserve bool @@ -86,6 +91,7 @@ type PermitPlugin struct { const ( prefilterPluginName = "prefilter-plugin" scorePluginName = "score-plugin" + filterPluginName = "filter-plugin" reservePluginName = "reserve-plugin" prebindPluginName = "prebind-plugin" unreservePluginName = "unreserve-plugin" @@ -95,6 +101,7 @@ const ( var _ = framework.PrefilterPlugin(&PrefilterPlugin{}) var _ = framework.ScorePlugin(&ScorePlugin{}) +var _ = framework.FilterPlugin(&FilterPlugin{}) var _ = framework.ReservePlugin(&ReservePlugin{}) var _ = framework.PrebindPlugin(&PrebindPlugin{}) var _ = framework.BindPlugin(&BindPlugin{}) @@ -137,6 +144,36 @@ func NewScorePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework. return scPlugin, nil } +var filterPlugin = &FilterPlugin{} + +// Name returns name of the plugin. +func (fp *FilterPlugin) Name() string { + return filterPluginName +} + +// reset is used to reset filter plugin. +func (fp *FilterPlugin) reset() { + fp.numFilterCalled = 0 + fp.failFilter = false +} + +// Filter is a test function that returns an error or nil, depending on the +// value of "failFilter". +func (fp *FilterPlugin) Filter(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status { + fp.numFilterCalled++ + + if fp.failFilter { + return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) + } + + return nil +} + +// NewFilterPlugin is the factory for filtler plugin. +func NewFilterPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { + return filterPlugin, nil +} + // Name returns name of the plugin. func (rp *ReservePlugin) Name() string { return reservePluginName @@ -1286,3 +1323,62 @@ func TestCoSchedulingWithPermitPlugin(t *testing.T) { cleanupPods(cs, t, []*v1.Pod{waitingPod, signallingPod}) } } + +// TestFilterPlugin tests invocation of filter plugins. +func TestFilterPlugin(t *testing.T) { + // Create a plugin registry for testing. Register only a filter plugin. + registry := framework.Registry{filterPluginName: NewFilterPlugin} + + // Setup initial filter plugin for testing. + plugin := &schedulerconfig.Plugins{ + Filter: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: filterPluginName, + }, + }, + }, + } + // Set empty plugin config for testing + emptyPluginConfig := []schedulerconfig.PluginConfig{} + + // Create the master and the scheduler with the test plugin set. + context := initTestSchedulerWithOptions(t, + initTestMaster(t, "filter-plugin", nil), + false, nil, registry, plugin, emptyPluginConfig, false, time.Second) + defer cleanupTest(t, context) + + cs := context.clientSet + // Add a few nodes. + _, err := createNodes(cs, "test-node", nil, 2) + if err != nil { + t.Fatalf("Cannot create nodes: %v", err) + } + + for _, fail := range []bool{false, true} { + filterPlugin.failFilter = fail + // Create a best effort pod. + pod, err := createPausePod(cs, + initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name})) + if err != nil { + t.Errorf("Error while creating a test pod: %v", err) + } + + if fail { + if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err == nil { + t.Errorf("Didn't expect the pod to be scheduled.") + } + } else { + if err = waitForPodToSchedule(cs, pod); err != nil { + t.Errorf("Expected the pod to be scheduled. error: %v", err) + } + } + + if filterPlugin.numFilterCalled == 0 { + t.Errorf("Expected the filter plugin to be called.") + } + + filterPlugin.reset() + cleanupPods(cs, t, []*v1.Pod{pod}) + } +}