Introduce PostFilter extension point

This commit is contained in:
Wei Huang 2020-06-05 13:02:45 -07:00
parent b8b4186a14
commit 69b9ba6012
No known key found for this signature in database
GPG Key ID: BE5E9752F8B6E005
5 changed files with 308 additions and 1 deletions

View File

@ -44,6 +44,7 @@ const (
preFilter = "PreFilter"
preFilterExtensionAddPod = "PreFilterExtensionAddPod"
preFilterExtensionRemovePod = "PreFilterExtensionRemovePod"
postFilter = "PostFilter"
preScore = "PreScore"
score = "Score"
scoreExtensionNormalize = "ScoreExtensionNormalize"
@ -67,6 +68,7 @@ type framework struct {
queueSortPlugins []QueueSortPlugin
preFilterPlugins []PreFilterPlugin
filterPlugins []FilterPlugin
postFilterPlugins []PostFilterPlugin
preScorePlugins []PreScorePlugin
scorePlugins []ScorePlugin
reservePlugins []ReservePlugin
@ -103,6 +105,7 @@ func (f *framework) getExtensionPoints(plugins *config.Plugins) []extensionPoint
return []extensionPoint{
{plugins.PreFilter, &f.preFilterPlugins},
{plugins.Filter, &f.filterPlugins},
{plugins.PostFilter, &f.postFilterPlugins},
{plugins.Reserve, &f.reservePlugins},
{plugins.PreScore, &f.preScorePlugins},
{plugins.Score, &f.scorePlugins},
@ -508,6 +511,33 @@ func (f *framework) runFilterPlugin(ctx context.Context, pl FilterPlugin, state
return status
}
// RunPostFilterPlugins runs the set of configured PostFilter plugins until the first
// Success or Error is met, otherwise continues to execute all plugins.
func (f *framework) RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status) {
statuses := make(PluginToStatus)
for _, pl := range f.postFilterPlugins {
r, s := f.runPostFilterPlugin(ctx, pl, state, pod, filteredNodeStatusMap)
if s.IsSuccess() {
return r, s
} else if !s.IsUnschedulable() {
// Any status other than Success or Unschedulable is Error.
return nil, NewStatus(Error, s.Message())
}
statuses[pl.Name()] = s
}
return nil, statuses.Merge()
}
func (f *framework) runPostFilterPlugin(ctx context.Context, pl PostFilterPlugin, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status) {
if !state.ShouldRecordPluginMetrics() {
return pl.PostFilter(ctx, state, pod, filteredNodeStatusMap)
}
startTime := time.Now()
r, s := pl.PostFilter(ctx, state, pod, filteredNodeStatusMap)
f.metricsRecorder.observePluginDurationAsync(postFilter, pl.Name(), s, metrics.SinceInSeconds(startTime))
return r, s
}
// RunPreScorePlugins runs the set of configured pre-score plugins. If any
// of these plugins returns any status other than "Success", the given pod is rejected.
func (f *framework) RunPreScorePlugins(
@ -957,3 +987,8 @@ func (f *framework) pluginsNeeded(plugins *config.Plugins) map[string]config.Plu
}
return pgMap
}
// PreemptHandle returns the internal preemptHandle object.
func (f *framework) PreemptHandle() PreemptHandle {
return f.preemptHandle
}

View File

@ -169,6 +169,10 @@ func (pl *TestPlugin) Filter(ctx context.Context, state *CycleState, pod *v1.Pod
return NewStatus(Code(pl.inj.FilterStatus), "injected filter status")
}
func (pl *TestPlugin) PostFilter(_ context.Context, _ *CycleState, _ *v1.Pod, _ NodeToStatusMap) (*PostFilterResult, *Status) {
return nil, NewStatus(Code(pl.inj.PostFilterStatus), "injected status")
}
func (pl *TestPlugin) PreScore(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) *Status {
return NewStatus(Code(pl.inj.PreScoreStatus), "injected status")
}
@ -1000,7 +1004,6 @@ func TestFilterPlugins(t *testing.T) {
name: "TestPlugin1",
inj: injectedResult{FilterStatus: int(UnschedulableAndUnresolvable)},
},
{
name: "TestPlugin2",
inj: injectedResult{FilterStatus: int(Unschedulable)},
@ -1051,6 +1054,84 @@ func TestFilterPlugins(t *testing.T) {
}
}
func TestPostFilterPlugins(t *testing.T) {
tests := []struct {
name string
plugins []*TestPlugin
wantStatus *Status
}{
{
name: "a single plugin makes a Pod schedulable",
plugins: []*TestPlugin{
{
name: "TestPlugin",
inj: injectedResult{PostFilterStatus: int(Success)},
},
},
wantStatus: NewStatus(Success, "injected status"),
},
{
name: "plugin1 failed to make a Pod schedulable, followed by plugin2 which makes the Pod schedulable",
plugins: []*TestPlugin{
{
name: "TestPlugin1",
inj: injectedResult{PostFilterStatus: int(Unschedulable)},
},
{
name: "TestPlugin2",
inj: injectedResult{PostFilterStatus: int(Success)},
},
},
wantStatus: NewStatus(Success, "injected status"),
},
{
name: "plugin1 makes a Pod schedulable, followed by plugin2 which cannot make the Pod schedulable",
plugins: []*TestPlugin{
{
name: "TestPlugin1",
inj: injectedResult{PostFilterStatus: int(Success)},
},
{
name: "TestPlugin2",
inj: injectedResult{PostFilterStatus: int(Unschedulable)},
},
},
wantStatus: NewStatus(Success, "injected status"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
registry := Registry{}
cfgPls := &config.Plugins{PostFilter: &config.PluginSet{}}
for _, pl := range tt.plugins {
// register all plugins
tmpPl := pl
if err := registry.Register(pl.name,
func(_ runtime.Object, _ FrameworkHandle) (Plugin, error) {
return tmpPl, nil
}); err != nil {
t.Fatalf("fail to register postFilter plugin (%s)", pl.name)
}
// append plugins to filter pluginset
cfgPls.PostFilter.Enabled = append(
cfgPls.PostFilter.Enabled,
config.Plugin{Name: pl.name},
)
}
f, err := newFrameworkWithQueueSortAndBind(registry, cfgPls, emptyArgs)
if err != nil {
t.Fatalf("fail to create framework: %s", err)
}
_, gotStatus := f.RunPostFilterPlugins(context.TODO(), nil, pod, nil)
if !reflect.DeepEqual(gotStatus, tt.wantStatus) {
t.Errorf("Unexpected status. got: %v, want: %v", gotStatus, tt.wantStatus)
}
})
}
}
func TestPreBindPlugins(t *testing.T) {
tests := []struct {
name string
@ -1932,6 +2013,7 @@ type injectedResult struct {
PreFilterAddPodStatus int `json:"preFilterAddPodStatus,omitempty"`
PreFilterRemovePodStatus int `json:"preFilterRemovePodStatus,omitempty"`
FilterStatus int `json:"filterStatus,omitempty"`
PostFilterStatus int `json:"postFilterStatus,omitempty"`
PreScoreStatus int `json:"preScoreStatus,omitempty"`
ReserveStatus int `json:"reserveStatus,omitempty"`
PreBindStatus int `json:"preBindStatus,omitempty"`

View File

@ -273,6 +273,23 @@ type FilterPlugin interface {
Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status
}
// PostFilterPlugin is an interface for PostFilter plugins. These plugins are called
// after a pod cannot be scheduled.
type PostFilterPlugin interface {
Plugin
// PostFilter is called by the scheduling framework.
// A PostFilter plugin should return one of the following statuses:
// - Unschedulable: the plugin gets executed successfully but the pod cannot be made schedulable.
// - Success: the plugin gets executed successfully and the pod can be made schedulable.
// - Error: the plugin aborts due to some internal error.
//
// Informational plugins should be configured ahead of other ones, and always return Unschedulable status.
// Optionally, a non-nil PostFilterResult may be returned along with a Success status. For example,
// a preemption plugin may choose to return nominatedNodeName, so that framework can reuse that to update the
// preemptor pod's .spec.status.nominatedNodeName field.
PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
}
// PreScorePlugin is an interface for Pre-score plugin. Pre-score is an
// informational extension point. Plugins will be called with a list of nodes
// that passed the filtering phase. A plugin may use this data to update internal
@ -398,6 +415,12 @@ type Framework interface {
// schedule the target pod.
RunFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) PluginToStatus
// RunPostFilterPlugins runs the set of configured PostFilter plugins.
// PostFilter plugins can either be informational, in which case should be configured
// to execute first and return Unschedulable status, or ones that try to change the
// cluster state to make the pod potentially schedulable in a future scheduling cycle.
RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
// RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
@ -490,6 +513,14 @@ type FrameworkHandle interface {
ClientSet() clientset.Interface
SharedInformerFactory() informers.SharedInformerFactory
// TODO: unroll the wrapped interfaces to FrameworkHandle.
PreemptHandle() PreemptHandle
}
// PostFilterResult wraps needed info for scheduler framework to act upon PostFilter phase.
type PostFilterResult struct {
NominatedNodeName string
}
// PreemptHandle incorporates all needed logic to run preemption logic.

View File

@ -545,10 +545,22 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
" No preemption is performed.")
} else {
preemptionStartTime := time.Now()
// TODO(Huang-Wei): implement the preemption logic as a PostFilter plugin.
nominatedNode, _ = sched.preempt(schedulingCycleCtx, prof, state, pod, fitError)
metrics.PreemptionAttempts.Inc()
metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
// Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.
result, status := prof.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses)
if status.Code() == framework.Error {
klog.Errorf("Status after running PostFilter plugins for pod %v/%v: %v", pod.Namespace, pod.Name, status)
} else {
klog.V(5).Infof("Status after running PostFilter plugins for pod %v/%v: %v", pod.Namespace, pod.Name, status)
}
if status.IsSuccess() && result != nil {
nominatedNode = result.NominatedNodeName
}
}
// Pod did not fit anywhere, so it is counted as a failure. If preemption
// succeeds, the pod should get counted as a success the next time we try to

View File

@ -55,6 +55,14 @@ type ScoreWithNormalizePlugin struct {
type FilterPlugin struct {
numFilterCalled int
failFilter bool
rejectFilter bool
}
type PostFilterPlugin struct {
fh framework.FrameworkHandle
numPostFilterCalled int
failPostFilter bool
rejectPostFilter bool
}
type ReservePlugin struct {
@ -110,6 +118,7 @@ type PermitPlugin struct {
const (
prefilterPluginName = "prefilter-plugin"
postfilterPluginName = "postfilter-plugin"
scorePluginName = "score-plugin"
scoreWithNormalizePluginName = "score-with-normalize-plugin"
filterPluginName = "filter-plugin"
@ -122,6 +131,7 @@ const (
)
var _ framework.PreFilterPlugin = &PreFilterPlugin{}
var _ framework.PostFilterPlugin = &PostFilterPlugin{}
var _ framework.ScorePlugin = &ScorePlugin{}
var _ framework.FilterPlugin = &FilterPlugin{}
var _ framework.ScorePlugin = &ScorePlugin{}
@ -141,6 +151,14 @@ func newPlugin(plugin framework.Plugin) framework.PluginFactory {
}
}
// newPlugin returns a plugin factory with specified Plugin.
func newPostFilterPlugin(plugin *PostFilterPlugin) framework.PluginFactory {
return func(_ runtime.Object, fh framework.FrameworkHandle) (framework.Plugin, error) {
plugin.fh = fh
return plugin, nil
}
}
// Name returns name of the score plugin.
func (sp *ScorePlugin) Name() string {
return scorePluginName
@ -219,6 +237,9 @@ func (fp *FilterPlugin) Filter(ctx context.Context, state *framework.CycleState,
if fp.failFilter {
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
}
if fp.rejectFilter {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name))
}
return nil
}
@ -365,6 +386,30 @@ func (pp *PreFilterPlugin) reset() {
pp.rejectPreFilter = false
}
// Name returns name of the plugin.
func (pp *PostFilterPlugin) Name() string {
return postfilterPluginName
}
func (pp *PostFilterPlugin) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, _ framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
pp.numPostFilterCalled++
nodeInfos, err := pp.fh.SnapshotSharedLister().NodeInfos().List()
if err != nil {
return nil, framework.NewStatus(framework.Error, err.Error())
}
ph := pp.fh.PreemptHandle()
for _, nodeInfo := range nodeInfos {
ph.RunFilterPlugins(ctx, state, pod, nodeInfo)
}
if pp.failPostFilter {
return nil, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
}
if pp.rejectPostFilter {
return nil, framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name))
}
return nil, framework.NewStatus(framework.Success, fmt.Sprintf("make room for pod %v to be schedulable", pod.Name))
}
// Name returns name of the plugin.
func (up *UnreservePlugin) Name() string {
return up.name
@ -531,6 +576,108 @@ func TestPreFilterPlugin(t *testing.T) {
}
}
// TestPostFilterPlugin tests invocation of postfilter plugins.
func TestPostFilterPlugin(t *testing.T) {
numNodes := 1
tests := []struct {
name string
rejectFilter bool
rejectPostFilter bool
expectFilterNumCalled int
expectPostFilterNumCalled int
}{
{
name: "Filter passed",
rejectFilter: false,
rejectPostFilter: false,
expectFilterNumCalled: numNodes,
expectPostFilterNumCalled: 0,
},
{
name: "Filter failed and PostFilter passed",
rejectFilter: true,
rejectPostFilter: false,
// TODO: change to <numNodes * 2> when the hard-coded preemption logic is removed.
expectFilterNumCalled: numNodes * 3,
expectPostFilterNumCalled: 1,
},
{
name: "Filter failed and PostFilter failed",
rejectFilter: true,
rejectPostFilter: true,
// TODO: change to <numNodes * 2> when the hard-coded preemption logic is removed.
expectFilterNumCalled: numNodes * 3,
expectPostFilterNumCalled: 1,
},
}
for i, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create a plugin registry for testing. Register a combination of filter and postFilter plugin.
var (
filterPlugin = &FilterPlugin{}
postFilterPlugin = &PostFilterPlugin{}
)
filterPlugin.rejectFilter = tt.rejectFilter
postFilterPlugin.rejectPostFilter = tt.rejectPostFilter
registry := framework.Registry{
filterPluginName: newPlugin(filterPlugin),
postfilterPluginName: newPostFilterPlugin(postFilterPlugin),
}
// Setup plugins for testing.
prof := schedulerconfig.KubeSchedulerProfile{
SchedulerName: v1.DefaultSchedulerName,
Plugins: &schedulerconfig.Plugins{
Filter: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{Name: filterPluginName},
},
},
PostFilter: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{Name: postfilterPluginName},
},
},
},
}
// Create the master and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(
t,
testutils.InitTestMaster(t, fmt.Sprintf("postfilter%v-", i), nil),
numNodes,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry),
)
defer testutils.CleanupTest(t, testCtx)
// Create a best effort pod.
pod, err := createPausePod(testCtx.ClientSet, initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
if tt.rejectFilter {
if err = wait.Poll(10*time.Millisecond, 10*time.Second, podUnschedulable(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil {
t.Errorf("Didn't expect the pod to be scheduled.")
}
} else {
if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
t.Errorf("Expected the pod to be scheduled. error: %v", err)
}
}
if filterPlugin.numFilterCalled != tt.expectFilterNumCalled {
t.Errorf("Expected the filter plugin to be called %v times, but got %v.", tt.expectFilterNumCalled, filterPlugin.numFilterCalled)
}
if postFilterPlugin.numPostFilterCalled != tt.expectPostFilterNumCalled {
t.Errorf("Expected the postfilter plugin to be called %v times, but got %v.", tt.expectPostFilterNumCalled, postFilterPlugin.numPostFilterCalled)
}
})
}
}
// TestScorePlugin tests invocation of score plugins.
func TestScorePlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a score plugin.