From c0eb0caf4ac2ea64432916a79c12e4cd1c8e17c2 Mon Sep 17 00:00:00 2001 From: kerthcet Date: Fri, 7 Jul 2023 10:51:30 +0800 Subject: [PATCH] Support fine-gained rescheduling in ReservePlugin Signed-off-by: kerthcet --- pkg/scheduler/framework/runtime/framework.go | 5 ++ .../framework/runtime/framework_test.go | 6 +- pkg/scheduler/framework/types_test.go | 11 +++ pkg/scheduler/schedule_one.go | 11 +++ .../scheduler/rescheduling_test.go | 79 +++++++++++++++++++ 5 files changed, 109 insertions(+), 3 deletions(-) diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 7e1cd020e99..679bdb995cf 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -1265,6 +1265,11 @@ func (f *frameworkImpl) RunReservePluginsReserve(ctx context.Context, state *fra ctx := klog.NewContext(ctx, logger) status = f.runReservePluginReserve(ctx, pl, state, pod, nodeName) if !status.IsSuccess() { + if status.IsUnschedulable() { + logger.V(4).Info("Pod rejected by plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", status.Message()) + status.SetFailedPlugin(pl.Name()) + return status + } err := status.AsError() logger.Error(err, "Plugin failed", "plugin", pl.Name(), "pod", klog.KObj(pod)) return framework.AsStatus(fmt.Errorf("running Reserve plugin %q: %w", pl.Name(), err)) diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index f2c7966a671..fd5b2c2272b 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -2422,7 +2422,7 @@ func TestReservePlugins(t *testing.T) { inj: injectedResult{ReserveStatus: int(framework.Unschedulable)}, }, }, - wantStatus: framework.AsStatus(fmt.Errorf(`running Reserve plugin "TestPlugin": %w`, errInjectedStatus)), + wantStatus: framework.NewStatus(framework.Unschedulable, injectReason).WithFailedPlugin("TestPlugin"), }, { name: "ErrorReservePlugin", @@ -2442,7 +2442,7 @@ func TestReservePlugins(t *testing.T) { inj: injectedResult{ReserveStatus: int(framework.UnschedulableAndUnresolvable)}, }, }, - wantStatus: framework.AsStatus(fmt.Errorf(`running Reserve plugin "TestPlugin": %w`, errInjectedStatus)), + wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, injectReason).WithFailedPlugin("TestPlugin"), }, { name: "SuccessSuccessReservePlugins", @@ -2512,7 +2512,7 @@ func TestReservePlugins(t *testing.T) { inj: injectedResult{ReserveStatus: int(framework.Success)}, }, }, - wantStatus: framework.AsStatus(fmt.Errorf(`running Reserve plugin "TestPlugin": %w`, errInjectedStatus)), + wantStatus: framework.NewStatus(framework.Unschedulable, injectReason).WithFailedPlugin("TestPlugin"), }, } diff --git a/pkg/scheduler/framework/types_test.go b/pkg/scheduler/framework/types_test.go index 47fab9d8dfb..8c67de70c39 100644 --- a/pkg/scheduler/framework/types_test.go +++ b/pkg/scheduler/framework/types_test.go @@ -1458,6 +1458,17 @@ func TestFitError_Error(t *testing.T) { }, wantReasonMsg: "0/1 nodes are available: 1 Node failed Permit plugin Permit-1.", }, + { + name: "failed to Reserve on node", + numAllNodes: 1, + diagnosis: Diagnosis{ + NodeToStatusMap: NodeToStatusMap{ + // There should be only one node here. + "node1": NewStatus(Unschedulable, "Node failed Reserve plugin Reserve-1"), + }, + }, + wantReasonMsg: "0/1 nodes are available: 1 Node failed Reserve plugin Reserve-1.", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index 1fcb36f2a9f..3b0610b2543 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -195,6 +195,17 @@ func (sched *Scheduler) schedulingCycle( logger.Error(forgetErr, "Scheduler cache ForgetPod failed") } + if sts.IsUnschedulable() { + fitErr := &framework.FitError{ + NumAllNodes: 1, + Pod: pod, + Diagnosis: framework.Diagnosis{ + NodeToStatusMap: framework.NodeToStatusMap{scheduleResult.SuggestedHost: sts}, + UnschedulablePlugins: sets.New(sts.FailedPlugin()), + }, + } + return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.NewStatus(sts.Code()).WithError(fitErr) + } return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, sts } diff --git a/test/integration/scheduler/rescheduling_test.go b/test/integration/scheduler/rescheduling_test.go index 5c48ce00a05..a22752ec756 100644 --- a/test/integration/scheduler/rescheduling_test.go +++ b/test/integration/scheduler/rescheduling_test.go @@ -31,6 +31,50 @@ import ( var _ framework.PermitPlugin = &PermitPlugin{} var _ framework.EnqueueExtensions = &PermitPlugin{} +var _ framework.ReservePlugin = &ReservePlugin{} +var _ framework.EnqueueExtensions = &ReservePlugin{} + +type ReservePlugin struct { + name string + statusCode framework.Code + numReserveCalled int + numUnreserveCalled int +} + +func (rp *ReservePlugin) Name() string { + return rp.name +} + +func (rp *ReservePlugin) Reserve(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status { + rp.numReserveCalled += 1 + + if rp.statusCode == framework.Error { + return framework.NewStatus(framework.Error, "failed to reserve") + } + + if rp.statusCode == framework.Unschedulable { + if rp.numReserveCalled <= 1 { + return framework.NewStatus(framework.Unschedulable, "reject to reserve") + } + } + + return nil +} + +func (rp *ReservePlugin) Unreserve(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) { + rp.numUnreserveCalled += 1 +} + +func (rp *ReservePlugin) EventsToRegister() []framework.ClusterEventWithHint { + return []framework.ClusterEventWithHint{ + { + Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, + QueueingHintFn: func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { + return framework.QueueImmediately + }, + }, + } +} type PermitPlugin struct { name string @@ -118,6 +162,41 @@ func TestReScheduling(t *testing.T) { wantFirstSchedulingError: true, wantError: true, }, + { + name: "Rescheduling pod rejected by Reserve Plugin", + plugins: []framework.Plugin{ + &ReservePlugin{name: "reserve", statusCode: framework.Unschedulable}, + }, + action: func() error { + _, err := createNode(testContext.ClientSet, st.MakeNode().Name("fake-node").Obj()) + return err + }, + wantScheduled: true, + }, + { + name: "Rescheduling pod rejected by Reserve Plugin with unrelated event", + plugins: []framework.Plugin{ + &ReservePlugin{name: "reserve", statusCode: framework.Unschedulable}, + }, + action: func() error { + _, err := createPausePod(testContext.ClientSet, + initPausePod(&testutils.PausePodConfig{Name: "test-pod-2", Namespace: testContext.NS.Name})) + return err + }, + wantScheduled: false, + }, + { + name: "Rescheduling pod failed by Reserve Plugin", + plugins: []framework.Plugin{ + &ReservePlugin{name: "reserve", statusCode: framework.Error}, + }, + action: func() error { + _, err := createNode(testContext.ClientSet, st.MakeNode().Name("fake-node").Obj()) + return err + }, + wantFirstSchedulingError: true, + wantError: true, + }, } for _, test := range tests {