From 468e2dac81d197860943b93eeb7df0a32844d7ef Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Thu, 23 Nov 2023 12:17:47 +0000 Subject: [PATCH] fix: requeue pods rejected by Extenders properly --- pkg/scheduler/framework/runtime/framework.go | 8 ++++ pkg/scheduler/framework/types.go | 3 ++ pkg/scheduler/schedule_one.go | 19 ++++++++- pkg/scheduler/schedule_one_test.go | 41 +++++++++++++++++++- 4 files changed, 68 insertions(+), 3 deletions(-) diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 8caa8ec1cb4..b06ecb30c33 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -260,6 +260,14 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler logger: logger, } + if len(f.extenders) > 0 { + // Extender doesn't support any kind of requeueing feature like EnqueueExtensions in the scheduling framework. + // We register a defaultEnqueueExtension to framework.ExtenderName here. + // And, in the scheduling cycle, when Extenders reject some Nodes and the pod ends up being unschedulable, + // we put framework.ExtenderName to pInfo.UnschedulablePlugins. + f.enqueueExtensions = []framework.EnqueueExtensions{&defaultEnqueueExtension{pluginName: framework.ExtenderName}} + } + if profile == nil { return f, nil } diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index 696ad9b41ac..45306d33be4 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -279,6 +279,9 @@ type WeightedAffinityTerm struct { Weight int32 } +// ExtenderName is a fake plugin name put in UnschedulablePlugins when Extender rejected some Nodes. +const ExtenderName = "Extender" + // Diagnosis records the details to diagnose a scheduling failure. type Diagnosis struct { NodeToStatusMap NodeToStatusMap diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index b9ded88732c..9c84ee883a1 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -500,11 +500,26 @@ func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.F return nil, diagnosis, err } - feasibleNodes, err = findNodesThatPassExtenders(ctx, sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap) + feasibleNodesAfterExtender, err := findNodesThatPassExtenders(ctx, sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap) if err != nil { return nil, diagnosis, err } - return feasibleNodes, diagnosis, nil + if len(feasibleNodesAfterExtender) != len(feasibleNodes) { + // Extenders filtered out some nodes. + // + // Extender doesn't support any kind of requeueing feature like EnqueueExtensions in the scheduling framework. + // When Extenders reject some Nodes and the pod ends up being unschedulable, + // we put framework.ExtenderName to pInfo.UnschedulablePlugins. + // This Pod will be requeued from unschedulable pod pool to activeQ/backoffQ + // by any kind of cluster events. + // https://github.com/kubernetes/kubernetes/issues/122019 + if diagnosis.UnschedulablePlugins == nil { + diagnosis.UnschedulablePlugins = sets.New[string]() + } + diagnosis.UnschedulablePlugins.Insert(framework.ExtenderName) + } + + return feasibleNodesAfterExtender, diagnosis, nil } func (sched *Scheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*v1.Node, error) { diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index 9a70a62921a..bc5e9b19f55 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -1771,6 +1771,7 @@ func TestSchedulerSchedulePod(t *testing.T) { tests := []struct { name string registerPlugins []tf.RegisterPluginFunc + extenders []tf.FakeExtender nodes []string pvcs []v1.PersistentVolumeClaim pod *v1.Pod @@ -2078,6 +2079,39 @@ func TestSchedulerSchedulePod(t *testing.T) { }, }, }, + { + name: "test with extender which filters out some Nodes", + registerPlugins: []tf.RegisterPluginFunc{ + tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + tf.RegisterFilterPlugin( + "FakeFilter", + tf.NewFakeFilterPlugin(map[string]framework.Code{"3": framework.Unschedulable}), + ), + tf.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1), + tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + }, + extenders: []tf.FakeExtender{ + { + ExtenderName: "FakeExtender1", + Predicates: []tf.FitPredicate{tf.FalsePredicateExtender}, + }, + }, + nodes: []string{"1", "2", "3"}, + pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(), + wantNodes: nil, + wErr: &framework.FitError{ + Pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(), + NumAllNodes: 3, + Diagnosis: framework.Diagnosis{ + NodeToStatusMap: framework.NodeToStatusMap{ + "1": framework.NewStatus(framework.Unschedulable, `FakeExtender: node "1" failed`), + "2": framework.NewStatus(framework.Unschedulable, `FakeExtender: node "2" failed`), + "3": framework.NewStatus(framework.Unschedulable, "injecting failure for pod test-filter").WithPlugin("FakeFilter"), + }, + UnschedulablePlugins: sets.New("FakeFilter", framework.ExtenderName), + }, + }, + }, { name: "test with filter plugin returning UnschedulableAndUnresolvable status", registerPlugins: []tf.RegisterPluginFunc{ @@ -2336,10 +2370,15 @@ func TestSchedulerSchedulePod(t *testing.T) { t.Fatal(err) } + var extenders []framework.Extender + for ii := range test.extenders { + extenders = append(extenders, &test.extenders[ii]) + } sched := &Scheduler{ Cache: cache, nodeInfoSnapshot: snapshot, percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, + Extenders: extenders, } sched.applyDefaultHandlers() @@ -2351,7 +2390,7 @@ func TestSchedulerSchedulePod(t *testing.T) { gotFitErr, gotOK := err.(*framework.FitError) wantFitErr, wantOK := test.wErr.(*framework.FitError) if gotOK != wantOK { - t.Errorf("Expected err to be FitError: %v, but got %v", wantOK, gotOK) + t.Errorf("Expected err to be FitError: %v, but got %v (error: %v)", wantOK, gotOK, err) } else if gotOK { if diff := cmp.Diff(gotFitErr, wantFitErr); diff != "" { t.Errorf("Unexpected fitErr: (-want, +got): %s", diff)