diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 65326a908ad..491d6a3eaa5 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -154,13 +154,13 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, } startPredicateEvalTime := time.Now() - filteredNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod) + feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod) if err != nil { return result, err } trace.Step("Computing predicates done") - if len(filteredNodes) == 0 { + if len(feasibleNodes) == 0 { return result, &FitError{ Pod: pod, NumAllNodes: g.nodeInfoSnapshot.NumNodes(), @@ -173,16 +173,16 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, startPriorityEvalTime := time.Now() // When only one node after predicate, just use it. - if len(filteredNodes) == 1 { + if len(feasibleNodes) == 1 { metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime)) return ScheduleResult{ - SuggestedHost: filteredNodes[0].Name, + SuggestedHost: feasibleNodes[0].Name, EvaluatedNodes: 1 + len(filteredNodesStatuses), FeasibleNodes: 1, }, nil } - priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, filteredNodes) + priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes) if err != nil { return result, err } @@ -195,8 +195,8 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, return ScheduleResult{ SuggestedHost: host, - EvaluatedNodes: len(filteredNodes) + len(filteredNodesStatuses), - FeasibleNodes: len(filteredNodes), + EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses), + FeasibleNodes: len(feasibleNodes), }, err } @@ -256,23 +256,37 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i // Filters the nodes to find the ones that fit the pod based on the framework // filter plugins and filter extenders. func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) { + filteredNodesStatuses := make(framework.NodeToStatusMap) + // Run "prefilter" plugins. s := prof.RunPreFilterPlugins(ctx, state, pod) if !s.IsSuccess() { - return nil, nil, s.AsError() + if !s.IsUnschedulable() { + return nil, nil, s.AsError() + } + // All nodes will have the same status. Some non trivial refactoring is + // needed to avoid this copy. + allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() + if err != nil { + return nil, nil, err + } + for _, n := range allNodes { + filteredNodesStatuses[n.Node().Name] = s + } + return nil, filteredNodesStatuses, nil + } - filteredNodesStatuses := make(framework.NodeToStatusMap) - filtered, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses) + feasibleNodes, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses) if err != nil { return nil, nil, err } - filtered, err = g.findNodesThatPassExtenders(pod, filtered, filteredNodesStatuses) + feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, filteredNodesStatuses) if err != nil { return nil, nil, err } - return filtered, filteredNodesStatuses, nil + return feasibleNodes, filteredNodesStatuses, nil } // findNodesThatPassFilters finds the nodes that fit the filter plugins. @@ -284,22 +298,22 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes))) - // Create filtered list with enough space to avoid growing it + // Create feasible list with enough space to avoid growing it // and allow assigning. - filtered := make([]*v1.Node, numNodesToFind) + feasibleNodes := make([]*v1.Node, numNodesToFind) if !prof.HasFilterPlugins() { length := len(allNodes) - for i := range filtered { - filtered[i] = allNodes[(g.nextStartNodeIndex+i)%length].Node() + for i := range feasibleNodes { + feasibleNodes[i] = allNodes[(g.nextStartNodeIndex+i)%length].Node() } - g.nextStartNodeIndex = (g.nextStartNodeIndex + len(filtered)) % length - return filtered, nil + g.nextStartNodeIndex = (g.nextStartNodeIndex + len(feasibleNodes)) % length + return feasibleNodes, nil } errCh := parallelize.NewErrorChannel() var statusesLock sync.Mutex - var filteredLen int32 + var feasibleNodesLen int32 ctx, cancel := context.WithCancel(ctx) checkNode := func(i int) { // We check the nodes starting from where we left off in the previous scheduling cycle, @@ -311,12 +325,12 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p return } if fits { - length := atomic.AddInt32(&filteredLen, 1) + length := atomic.AddInt32(&feasibleNodesLen, 1) if length > numNodesToFind { cancel() - atomic.AddInt32(&filteredLen, -1) + atomic.AddInt32(&feasibleNodesLen, -1) } else { - filtered[length-1] = nodeInfo.Node() + feasibleNodes[length-1] = nodeInfo.Node() } } else { statusesLock.Lock() @@ -339,26 +353,26 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p // Stops searching for more nodes once the configured number of feasible nodes // are found. parallelize.Until(ctx, len(allNodes), checkNode) - processedNodes := int(filteredLen) + len(statuses) + processedNodes := int(feasibleNodesLen) + len(statuses) g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes) - filtered = filtered[:filteredLen] + feasibleNodes = feasibleNodes[:feasibleNodesLen] if err := errCh.ReceiveError(); err != nil { statusCode = framework.Error return nil, err } - return filtered, nil + return feasibleNodes, nil } -func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) { +func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, feasibleNodes []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) { for _, extender := range g.extenders { - if len(filtered) == 0 { + if len(feasibleNodes) == 0 { break } if !extender.IsInterested(pod) { continue } - filteredList, failedMap, err := extender.Filter(pod, filtered) + feasibleList, failedMap, err := extender.Filter(pod, feasibleNodes) if err != nil { if extender.IsIgnorable() { klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set", @@ -375,9 +389,9 @@ func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v statuses[failedNodeName].AppendReason(failedMsg) } } - filtered = filteredList + feasibleNodes = feasibleList } - return filtered, nil + return feasibleNodes, nil } // addNominatedPods adds pods with equal or greater priority which are nominated diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index c65bf3d64d3..402b9be5e42 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -677,6 +677,43 @@ func TestGenericScheduler(t *testing.T) { expectedHosts: nil, wErr: nil, }, + { + name: "test prefilter plugin returning Unschedulable status", + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterPreFilterPlugin( + "FakePreFilter", + st.NewFakePreFilterPlugin(framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status")), + ), + st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + }, + nodes: []string{"1", "2"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}}, + expectedHosts: nil, + wErr: &FitError{ + Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}}, + NumAllNodes: 2, + FilteredNodesStatuses: framework.NodeToStatusMap{ + "1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status"), + "2": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status"), + }, + }, + }, + { + name: "test prefilter plugin returning error status", + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterPreFilterPlugin( + "FakePreFilter", + st.NewFakePreFilterPlugin(framework.NewStatus(framework.Error, "injected error status")), + ), + st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + }, + nodes: []string{"1", "2"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}}, + expectedHosts: nil, + wErr: fmt.Errorf(`prefilter plugin "FakePreFilter" failed for pod "test-prefilter": injected error status`), + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { @@ -717,7 +754,7 @@ func TestGenericScheduler(t *testing.T) { schedulerapi.DefaultPercentageOfNodesToScore) result, err := scheduler.Schedule(context.Background(), prof, framework.NewCycleState(), test.pod) if !reflect.DeepEqual(err, test.wErr) { - t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr) + t.Errorf("want: %v, got: %v", test.wErr, err) } if test.expectedHosts != nil && !test.expectedHosts.Has(result.SuggestedHost) { t.Errorf("Expected: %s, got: %s", test.expectedHosts, result.SuggestedHost) diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 0cc09db631e..553b7e6da39 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -395,11 +395,9 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor status = f.runPreFilterPlugin(ctx, pl, state, pod) if !status.IsSuccess() { if status.IsUnschedulable() { - msg := fmt.Sprintf("rejected by %q at prefilter: %v", pl.Name(), status.Message()) - klog.V(4).Infof(msg) - return framework.NewStatus(status.Code(), msg) + return status } - msg := fmt.Sprintf("error while running %q prefilter plugin for pod %q: %v", pl.Name(), pod.Name, status.Message()) + msg := fmt.Sprintf("prefilter plugin %q failed for pod %q: %v", pl.Name(), pod.Name, status.Message()) klog.Error(msg) return framework.NewStatus(framework.Error, msg) } diff --git a/pkg/scheduler/testing/fake_plugins.go b/pkg/scheduler/testing/fake_plugins.go index cf0a26e5505..45be1500e4d 100644 --- a/pkg/scheduler/testing/fake_plugins.go +++ b/pkg/scheduler/testing/fake_plugins.go @@ -123,3 +123,32 @@ func (pl *MatchFilterPlugin) Filter(_ context.Context, _ *framework.CycleState, func NewMatchFilterPlugin(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) { return &MatchFilterPlugin{}, nil } + +// FakePreFilterPlugin is a test filter plugin. +type FakePreFilterPlugin struct { + Status *framework.Status +} + +// Name returns name of the plugin. +func (pl *FakePreFilterPlugin) Name() string { + return "FakePreFilter" +} + +// PreFilter invoked at the PreFilter extension point. +func (pl *FakePreFilterPlugin) PreFilter(_ context.Context, _ *framework.CycleState, pod *v1.Pod) *framework.Status { + return pl.Status +} + +// PreFilterExtensions no extensions implemented by this plugin. +func (pl *FakePreFilterPlugin) PreFilterExtensions() framework.PreFilterExtensions { + return nil +} + +// NewFakePreFilterPlugin initializes a fakePreFilterPlugin and returns it. +func NewFakePreFilterPlugin(status *framework.Status) frameworkruntime.PluginFactory { + return func(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) { + return &FakePreFilterPlugin{ + Status: status, + }, nil + } +} diff --git a/pkg/scheduler/testing/framework_helpers.go b/pkg/scheduler/testing/framework_helpers.go index 251becc17f4..b7e8c4dd933 100644 --- a/pkg/scheduler/testing/framework_helpers.go +++ b/pkg/scheduler/testing/framework_helpers.go @@ -42,6 +42,11 @@ func RegisterQueueSortPlugin(pluginName string, pluginNewFunc runtime.PluginFact return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "QueueSort") } +// RegisterPreFilterPlugin returns a function to register a PreFilter Plugin to a given registry. +func RegisterPreFilterPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc { + return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "PreFilter") +} + // RegisterFilterPlugin returns a function to register a Filter Plugin to a given registry. func RegisterFilterPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc { return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "Filter")