diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index 679f9310e19..343bcfb39bd 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -44,6 +44,7 @@ const ( preFilter = "PreFilter" preFilterExtensionAddPod = "PreFilterExtensionAddPod" preFilterExtensionRemovePod = "PreFilterExtensionRemovePod" + postFilter = "PostFilter" preScore = "PreScore" score = "Score" scoreExtensionNormalize = "ScoreExtensionNormalize" @@ -67,6 +68,7 @@ type framework struct { queueSortPlugins []QueueSortPlugin preFilterPlugins []PreFilterPlugin filterPlugins []FilterPlugin + postFilterPlugins []PostFilterPlugin preScorePlugins []PreScorePlugin scorePlugins []ScorePlugin reservePlugins []ReservePlugin @@ -103,6 +105,7 @@ func (f *framework) getExtensionPoints(plugins *config.Plugins) []extensionPoint return []extensionPoint{ {plugins.PreFilter, &f.preFilterPlugins}, {plugins.Filter, &f.filterPlugins}, + {plugins.PostFilter, &f.postFilterPlugins}, {plugins.Reserve, &f.reservePlugins}, {plugins.PreScore, &f.preScorePlugins}, {plugins.Score, &f.scorePlugins}, @@ -508,6 +511,33 @@ func (f *framework) runFilterPlugin(ctx context.Context, pl FilterPlugin, state return status } +// RunPostFilterPlugins runs the set of configured PostFilter plugins until the first +// Success or Error is met, otherwise continues to execute all plugins. +func (f *framework) RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status) { + statuses := make(PluginToStatus) + for _, pl := range f.postFilterPlugins { + r, s := f.runPostFilterPlugin(ctx, pl, state, pod, filteredNodeStatusMap) + if s.IsSuccess() { + return r, s + } else if !s.IsUnschedulable() { + // Any status other than Success or Unschedulable is Error. + return nil, NewStatus(Error, s.Message()) + } + statuses[pl.Name()] = s + } + return nil, statuses.Merge() +} + +func (f *framework) runPostFilterPlugin(ctx context.Context, pl PostFilterPlugin, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status) { + if !state.ShouldRecordPluginMetrics() { + return pl.PostFilter(ctx, state, pod, filteredNodeStatusMap) + } + startTime := time.Now() + r, s := pl.PostFilter(ctx, state, pod, filteredNodeStatusMap) + f.metricsRecorder.observePluginDurationAsync(postFilter, pl.Name(), s, metrics.SinceInSeconds(startTime)) + return r, s +} + // RunPreScorePlugins runs the set of configured pre-score plugins. If any // of these plugins returns any status other than "Success", the given pod is rejected. func (f *framework) RunPreScorePlugins( @@ -957,3 +987,8 @@ func (f *framework) pluginsNeeded(plugins *config.Plugins) map[string]config.Plu } return pgMap } + +// PreemptHandle returns the internal preemptHandle object. +func (f *framework) PreemptHandle() PreemptHandle { + return f.preemptHandle +} diff --git a/pkg/scheduler/framework/v1alpha1/framework_test.go b/pkg/scheduler/framework/v1alpha1/framework_test.go index b7045a8e3bb..120b419ae18 100644 --- a/pkg/scheduler/framework/v1alpha1/framework_test.go +++ b/pkg/scheduler/framework/v1alpha1/framework_test.go @@ -169,6 +169,10 @@ func (pl *TestPlugin) Filter(ctx context.Context, state *CycleState, pod *v1.Pod return NewStatus(Code(pl.inj.FilterStatus), "injected filter status") } +func (pl *TestPlugin) PostFilter(_ context.Context, _ *CycleState, _ *v1.Pod, _ NodeToStatusMap) (*PostFilterResult, *Status) { + return nil, NewStatus(Code(pl.inj.PostFilterStatus), "injected status") +} + func (pl *TestPlugin) PreScore(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) *Status { return NewStatus(Code(pl.inj.PreScoreStatus), "injected status") } @@ -1000,7 +1004,6 @@ func TestFilterPlugins(t *testing.T) { name: "TestPlugin1", inj: injectedResult{FilterStatus: int(UnschedulableAndUnresolvable)}, }, - { name: "TestPlugin2", inj: injectedResult{FilterStatus: int(Unschedulable)}, @@ -1051,6 +1054,84 @@ func TestFilterPlugins(t *testing.T) { } } +func TestPostFilterPlugins(t *testing.T) { + tests := []struct { + name string + plugins []*TestPlugin + wantStatus *Status + }{ + { + name: "a single plugin makes a Pod schedulable", + plugins: []*TestPlugin{ + { + name: "TestPlugin", + inj: injectedResult{PostFilterStatus: int(Success)}, + }, + }, + wantStatus: NewStatus(Success, "injected status"), + }, + { + name: "plugin1 failed to make a Pod schedulable, followed by plugin2 which makes the Pod schedulable", + plugins: []*TestPlugin{ + { + name: "TestPlugin1", + inj: injectedResult{PostFilterStatus: int(Unschedulable)}, + }, + { + name: "TestPlugin2", + inj: injectedResult{PostFilterStatus: int(Success)}, + }, + }, + wantStatus: NewStatus(Success, "injected status"), + }, + { + name: "plugin1 makes a Pod schedulable, followed by plugin2 which cannot make the Pod schedulable", + plugins: []*TestPlugin{ + { + name: "TestPlugin1", + inj: injectedResult{PostFilterStatus: int(Success)}, + }, + { + name: "TestPlugin2", + inj: injectedResult{PostFilterStatus: int(Unschedulable)}, + }, + }, + wantStatus: NewStatus(Success, "injected status"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + registry := Registry{} + cfgPls := &config.Plugins{PostFilter: &config.PluginSet{}} + for _, pl := range tt.plugins { + // register all plugins + tmpPl := pl + if err := registry.Register(pl.name, + func(_ runtime.Object, _ FrameworkHandle) (Plugin, error) { + return tmpPl, nil + }); err != nil { + t.Fatalf("fail to register postFilter plugin (%s)", pl.name) + } + // append plugins to filter pluginset + cfgPls.PostFilter.Enabled = append( + cfgPls.PostFilter.Enabled, + config.Plugin{Name: pl.name}, + ) + } + + f, err := newFrameworkWithQueueSortAndBind(registry, cfgPls, emptyArgs) + if err != nil { + t.Fatalf("fail to create framework: %s", err) + } + _, gotStatus := f.RunPostFilterPlugins(context.TODO(), nil, pod, nil) + if !reflect.DeepEqual(gotStatus, tt.wantStatus) { + t.Errorf("Unexpected status. got: %v, want: %v", gotStatus, tt.wantStatus) + } + }) + } +} + func TestPreBindPlugins(t *testing.T) { tests := []struct { name string @@ -1932,6 +2013,7 @@ type injectedResult struct { PreFilterAddPodStatus int `json:"preFilterAddPodStatus,omitempty"` PreFilterRemovePodStatus int `json:"preFilterRemovePodStatus,omitempty"` FilterStatus int `json:"filterStatus,omitempty"` + PostFilterStatus int `json:"postFilterStatus,omitempty"` PreScoreStatus int `json:"preScoreStatus,omitempty"` ReserveStatus int `json:"reserveStatus,omitempty"` PreBindStatus int `json:"preBindStatus,omitempty"` diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index a81086cf9c4..b1b97dbc1be 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -273,6 +273,23 @@ type FilterPlugin interface { Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status } +// PostFilterPlugin is an interface for PostFilter plugins. These plugins are called +// after a pod cannot be scheduled. +type PostFilterPlugin interface { + Plugin + // PostFilter is called by the scheduling framework. + // A PostFilter plugin should return one of the following statuses: + // - Unschedulable: the plugin gets executed successfully but the pod cannot be made schedulable. + // - Success: the plugin gets executed successfully and the pod can be made schedulable. + // - Error: the plugin aborts due to some internal error. + // + // Informational plugins should be configured ahead of other ones, and always return Unschedulable status. + // Optionally, a non-nil PostFilterResult may be returned along with a Success status. For example, + // a preemption plugin may choose to return nominatedNodeName, so that framework can reuse that to update the + // preemptor pod's .spec.status.nominatedNodeName field. + PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status) +} + // PreScorePlugin is an interface for Pre-score plugin. Pre-score is an // informational extension point. Plugins will be called with a list of nodes // that passed the filtering phase. A plugin may use this data to update internal @@ -398,6 +415,12 @@ type Framework interface { // schedule the target pod. RunFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) PluginToStatus + // RunPostFilterPlugins runs the set of configured PostFilter plugins. + // PostFilter plugins can either be informational, in which case should be configured + // to execute first and return Unschedulable status, or ones that try to change the + // cluster state to make the pod potentially schedulable in a future scheduling cycle. + RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status) + // RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured // PreFilter plugins. It returns directly if any of the plugins return any // status other than Success. @@ -490,6 +513,14 @@ type FrameworkHandle interface { ClientSet() clientset.Interface SharedInformerFactory() informers.SharedInformerFactory + + // TODO: unroll the wrapped interfaces to FrameworkHandle. + PreemptHandle() PreemptHandle +} + +// PostFilterResult wraps needed info for scheduler framework to act upon PostFilter phase. +type PostFilterResult struct { + NominatedNodeName string } // PreemptHandle incorporates all needed logic to run preemption logic. diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 23bd69ea5d1..e2df50b4c67 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -545,10 +545,22 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { " No preemption is performed.") } else { preemptionStartTime := time.Now() + // TODO(Huang-Wei): implement the preemption logic as a PostFilter plugin. nominatedNode, _ = sched.preempt(schedulingCycleCtx, prof, state, pod, fitError) metrics.PreemptionAttempts.Inc() metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime)) metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime)) + + // Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle. + result, status := prof.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses) + if status.Code() == framework.Error { + klog.Errorf("Status after running PostFilter plugins for pod %v/%v: %v", pod.Namespace, pod.Name, status) + } else { + klog.V(5).Infof("Status after running PostFilter plugins for pod %v/%v: %v", pod.Namespace, pod.Name, status) + } + if status.IsSuccess() && result != nil { + nominatedNode = result.NominatedNodeName + } } // Pod did not fit anywhere, so it is counted as a failure. If preemption // succeeds, the pod should get counted as a success the next time we try to diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index f9f95f27523..4176cc65e1f 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -55,6 +55,14 @@ type ScoreWithNormalizePlugin struct { type FilterPlugin struct { numFilterCalled int failFilter bool + rejectFilter bool +} + +type PostFilterPlugin struct { + fh framework.FrameworkHandle + numPostFilterCalled int + failPostFilter bool + rejectPostFilter bool } type ReservePlugin struct { @@ -110,6 +118,7 @@ type PermitPlugin struct { const ( prefilterPluginName = "prefilter-plugin" + postfilterPluginName = "postfilter-plugin" scorePluginName = "score-plugin" scoreWithNormalizePluginName = "score-with-normalize-plugin" filterPluginName = "filter-plugin" @@ -122,6 +131,7 @@ const ( ) var _ framework.PreFilterPlugin = &PreFilterPlugin{} +var _ framework.PostFilterPlugin = &PostFilterPlugin{} var _ framework.ScorePlugin = &ScorePlugin{} var _ framework.FilterPlugin = &FilterPlugin{} var _ framework.ScorePlugin = &ScorePlugin{} @@ -141,6 +151,14 @@ func newPlugin(plugin framework.Plugin) framework.PluginFactory { } } +// newPlugin returns a plugin factory with specified Plugin. +func newPostFilterPlugin(plugin *PostFilterPlugin) framework.PluginFactory { + return func(_ runtime.Object, fh framework.FrameworkHandle) (framework.Plugin, error) { + plugin.fh = fh + return plugin, nil + } +} + // Name returns name of the score plugin. func (sp *ScorePlugin) Name() string { return scorePluginName @@ -219,6 +237,9 @@ func (fp *FilterPlugin) Filter(ctx context.Context, state *framework.CycleState, if fp.failFilter { return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) } + if fp.rejectFilter { + return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)) + } return nil } @@ -365,6 +386,30 @@ func (pp *PreFilterPlugin) reset() { pp.rejectPreFilter = false } +// Name returns name of the plugin. +func (pp *PostFilterPlugin) Name() string { + return postfilterPluginName +} + +func (pp *PostFilterPlugin) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, _ framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { + pp.numPostFilterCalled++ + nodeInfos, err := pp.fh.SnapshotSharedLister().NodeInfos().List() + if err != nil { + return nil, framework.NewStatus(framework.Error, err.Error()) + } + ph := pp.fh.PreemptHandle() + for _, nodeInfo := range nodeInfos { + ph.RunFilterPlugins(ctx, state, pod, nodeInfo) + } + if pp.failPostFilter { + return nil, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) + } + if pp.rejectPostFilter { + return nil, framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)) + } + return nil, framework.NewStatus(framework.Success, fmt.Sprintf("make room for pod %v to be schedulable", pod.Name)) +} + // Name returns name of the plugin. func (up *UnreservePlugin) Name() string { return up.name @@ -531,6 +576,108 @@ func TestPreFilterPlugin(t *testing.T) { } } +// TestPostFilterPlugin tests invocation of postfilter plugins. +func TestPostFilterPlugin(t *testing.T) { + numNodes := 1 + tests := []struct { + name string + rejectFilter bool + rejectPostFilter bool + expectFilterNumCalled int + expectPostFilterNumCalled int + }{ + { + name: "Filter passed", + rejectFilter: false, + rejectPostFilter: false, + expectFilterNumCalled: numNodes, + expectPostFilterNumCalled: 0, + }, + { + name: "Filter failed and PostFilter passed", + rejectFilter: true, + rejectPostFilter: false, + // TODO: change to when the hard-coded preemption logic is removed. + expectFilterNumCalled: numNodes * 3, + expectPostFilterNumCalled: 1, + }, + { + name: "Filter failed and PostFilter failed", + rejectFilter: true, + rejectPostFilter: true, + // TODO: change to when the hard-coded preemption logic is removed. + expectFilterNumCalled: numNodes * 3, + expectPostFilterNumCalled: 1, + }, + } + + for i, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a plugin registry for testing. Register a combination of filter and postFilter plugin. + var ( + filterPlugin = &FilterPlugin{} + postFilterPlugin = &PostFilterPlugin{} + ) + filterPlugin.rejectFilter = tt.rejectFilter + postFilterPlugin.rejectPostFilter = tt.rejectPostFilter + registry := framework.Registry{ + filterPluginName: newPlugin(filterPlugin), + postfilterPluginName: newPostFilterPlugin(postFilterPlugin), + } + + // Setup plugins for testing. + prof := schedulerconfig.KubeSchedulerProfile{ + SchedulerName: v1.DefaultSchedulerName, + Plugins: &schedulerconfig.Plugins{ + Filter: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + {Name: filterPluginName}, + }, + }, + PostFilter: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + {Name: postfilterPluginName}, + }, + }, + }, + } + + // Create the master and the scheduler with the test plugin set. + testCtx := initTestSchedulerForFrameworkTest( + t, + testutils.InitTestMaster(t, fmt.Sprintf("postfilter%v-", i), nil), + numNodes, + scheduler.WithProfiles(prof), + scheduler.WithFrameworkOutOfTreeRegistry(registry), + ) + defer testutils.CleanupTest(t, testCtx) + + // Create a best effort pod. + pod, err := createPausePod(testCtx.ClientSet, initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) + if err != nil { + t.Errorf("Error while creating a test pod: %v", err) + } + + if tt.rejectFilter { + if err = wait.Poll(10*time.Millisecond, 10*time.Second, podUnschedulable(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { + t.Errorf("Didn't expect the pod to be scheduled.") + } + } else { + if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { + t.Errorf("Expected the pod to be scheduled. error: %v", err) + } + } + + if filterPlugin.numFilterCalled != tt.expectFilterNumCalled { + t.Errorf("Expected the filter plugin to be called %v times, but got %v.", tt.expectFilterNumCalled, filterPlugin.numFilterCalled) + } + if postFilterPlugin.numPostFilterCalled != tt.expectPostFilterNumCalled { + t.Errorf("Expected the postfilter plugin to be called %v times, but got %v.", tt.expectPostFilterNumCalled, postFilterPlugin.numPostFilterCalled) + } + }) + } +} + // TestScorePlugin tests invocation of score plugins. func TestScorePlugin(t *testing.T) { // Create a plugin registry for testing. Register only a score plugin.