From 765f3916c27a871eb4542c903bf26216b6564322 Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Sat, 5 Aug 2023 17:39:01 -0700 Subject: [PATCH] Fix a bug that PostFilter plugin may not function if previous PreFilter plugins return Skip --- pkg/scheduler/framework/runtime/framework.go | 4 +- .../framework/runtime/framework_test.go | 4 +- .../scheduler/preemption/preemption_test.go | 48 +++++++++++++++++-- 3 files changed, 49 insertions(+), 7 deletions(-) diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index f83b3d45472..d6d4d4eade5 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -632,12 +632,13 @@ func (f *frameworkImpl) QueueSortFunc() framework.LessFunc { // If a non-success status is returned, then the scheduling cycle is aborted. func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status) { startTime := time.Now() + skipPlugins := sets.New[string]() defer func() { + state.SkipFilterPlugins = skipPlugins metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) }() var result *framework.PreFilterResult var pluginsWithNodes []string - skipPlugins := sets.New[string]() logger := klog.FromContext(ctx) logger = klog.LoggerWithName(logger, "PreFilter") // TODO(knelasevero): Remove duplicated keys from log entry calls @@ -671,7 +672,6 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor return nil, framework.NewStatus(framework.Unschedulable, msg) } } - state.SkipFilterPlugins = skipPlugins return result, nil } diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index fd5b2c2272b..f09b55c5144 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -1509,8 +1509,8 @@ func TestRunPreFilterPlugins(t *testing.T) { inj: injectedResult{PreFilterStatus: int(framework.Error)}, }, }, - wantPreFilterResult: nil, - wantStatusCode: framework.Error, + wantSkippedPlugins: sets.New("skip"), + wantStatusCode: framework.Error, }, { name: "all PreFilter plugins returned skip", diff --git a/test/integration/scheduler/preemption/preemption_test.go b/test/integration/scheduler/preemption/preemption_test.go index 7db8533889d..d859a6152cd 100644 --- a/test/integration/scheduler/preemption/preemption_test.go +++ b/test/integration/scheduler/preemption/preemption_test.go @@ -99,9 +99,16 @@ func waitForNominatedNodeName(cs clientset.Interface, pod *v1.Pod) error { const tokenFilterName = "token-filter" +// tokenFilter is a fake plugin that implements PreFilter and Filter. +// `Token` simulates the allowed pods number a cluster can host. +// If `EnablePreFilter` is set to false or `Token` is positive, PreFilter passes; otherwise returns Unschedulable +// For each Filter() call, `Token` is decreased by one. When `Token` is positive, Filter passes; otherwise return +// Unschedulable or UnschedulableAndUnresolvable (when `Unresolvable` is set to true) +// AddPod()/RemovePod() adds/removes one token to the cluster to simulate the dryrun preemption type tokenFilter struct { - Tokens int - Unresolvable bool + Tokens int + Unresolvable bool + EnablePreFilter bool } // Name returns name of the plugin. @@ -123,7 +130,10 @@ func (fp *tokenFilter) Filter(ctx context.Context, state *framework.CycleState, } func (fp *tokenFilter) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { - return nil, nil + if !fp.EnablePreFilter || fp.Tokens > 0 { + return nil, nil + } + return nil, framework.NewStatus(framework.Unschedulable) } func (fp *tokenFilter) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, @@ -194,6 +204,7 @@ func TestPreemption(t *testing.T) { existingPods []*v1.Pod pod *v1.Pod initTokens int + enablePreFilter bool unresolvable bool preemptedPodIndexes map[int]struct{} enablePodDisruptionConditions bool @@ -274,6 +285,36 @@ func TestPreemption(t *testing.T) { }), preemptedPodIndexes: map[int]struct{}{0: {}}, }, + // This is identical with previous subtest except for setting enablePreFilter to true. + // With this fake plugin returning Unschedulable in PreFilter, it's able to exercise the path + // that in-tree plugins return Skip in PreFilter and their AddPod/RemovePod functions are also + // skipped properly upon preemption. + { + name: "basic pod preemption with preFilter", + initTokens: 1, + enablePreFilter: true, + existingPods: []*v1.Pod{ + initPausePod(&testutils.PausePodConfig{ + Name: "victim-pod", + Namespace: testCtx.NS.Name, + Priority: &lowPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, + }, + }), + }, + pod: initPausePod(&testutils.PausePodConfig{ + Name: "preemptor-pod", + Namespace: testCtx.NS.Name, + Priority: &highPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, + }, + }), + preemptedPodIndexes: map[int]struct{}{0: {}}, + }, { // same as the previous test, but the filter is unresolvable. name: "basic pod preemption with unresolvable filter", @@ -445,6 +486,7 @@ func TestPreemption(t *testing.T) { t.Run(test.name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)() filter.Tokens = test.initTokens + filter.EnablePreFilter = test.enablePreFilter filter.Unresolvable = test.unresolvable pods := make([]*v1.Pod, len(test.existingPods)) // Create and run existingPods.