From 17acc4a5eeb0e92d12d72ed95b8d312931227f5a Mon Sep 17 00:00:00 2001 From: Ania Borowiec Date: Thu, 20 Mar 2025 20:17:53 +0000 Subject: [PATCH] Move queue.Done() before Prebind, add tests --- .../backend/queue/scheduling_queue.go | 14 +- pkg/scheduler/schedule_one.go | 27 +- pkg/scheduler/schedule_one_test.go | 576 +++++++++++++++--- 3 files changed, 516 insertions(+), 101 deletions(-) diff --git a/pkg/scheduler/backend/queue/scheduling_queue.go b/pkg/scheduler/backend/queue/scheduling_queue.go index d3e3efba785..69eae17fd16 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue.go +++ b/pkg/scheduler/backend/queue/scheduling_queue.go @@ -134,6 +134,7 @@ type SchedulingQueue interface { PodsInActiveQ() []*v1.Pod // PodsInBackoffQ returns all the Pods in the backoffQ. PodsInBackoffQ() []*v1.Pod + UnschedulablePods() []*v1.Pod } // NewSchedulingQueue initializes a priority queue as a new scheduling queue. @@ -1205,6 +1206,15 @@ func (p *PriorityQueue) PodsInBackoffQ() []*v1.Pod { return p.backoffQ.list() } +// UnschedulablePods returns all the pods in unschedulable state. +func (p *PriorityQueue) UnschedulablePods() []*v1.Pod { + var result []*v1.Pod + for _, pInfo := range p.unschedulablePods.podInfoMap { + result = append(result, pInfo.Pod) + } + return result +} + var pendingPodsSummary = "activeQ:%v; backoffQ:%v; unschedulablePods:%v" // GetPod searches for a pod in the activeQ, backoffQ, and unschedulablePods. @@ -1241,9 +1251,9 @@ func (p *PriorityQueue) GetPod(name, namespace string) (pInfo *framework.QueuedP func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) { p.lock.RLock() defer p.lock.RUnlock() - result := p.activeQ.list() + result := p.PodsInActiveQ() activeQLen := len(result) - backoffQPods := p.backoffQ.list() + backoffQPods := p.PodsInBackoffQ() backoffQLen := len(backoffQPods) result = append(result, backoffQPods...) for _, pInfo := range p.unschedulablePods.podInfoMap { diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index 5078470b974..35b78492255 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -292,30 +292,19 @@ func (sched *Scheduler) bindingCycle( return status } - // Run "prebind" plugins. - if status := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !status.IsSuccess() { - if status.IsRejected() { - fitErr := &framework.FitError{ - NumAllNodes: 1, - Pod: assumedPodInfo.Pod, - Diagnosis: framework.Diagnosis{ - NodeToStatus: framework.NewDefaultNodeToStatus(), - UnschedulablePlugins: sets.New(status.Plugin()), - }, - } - fitErr.Diagnosis.NodeToStatus.Set(scheduleResult.SuggestedHost, status) - return framework.NewStatus(status.Code()).WithError(fitErr) - } - return status - } - // Any failures after this point cannot lead to the Pod being considered unschedulable. - // We define the Pod as "unschedulable" only when Pods are rejected at specific extension points, and PreBind is the last one in the scheduling/binding cycle. + // We define the Pod as "unschedulable" only when Pods are rejected at specific extension points, and Permit is the last one in the scheduling/binding cycle. + // If a Pod fails on PreBind or Bind, it should be moved to BackoffQ for retry. // // We can call Done() here because - // we can free the cluster events stored in the scheduling queue sonner, which is worth for busy clusters memory consumption wise. + // we can free the cluster events stored in the scheduling queue sooner, which is worth for busy clusters memory consumption wise. sched.SchedulingQueue.Done(assumedPod.UID) + // Run "prebind" plugins. + if status := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !status.IsSuccess() { + return status + } + // Run "bind" plugins. if status := sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state); !status.IsSuccess() { return status diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index 403fc88cc36..1d9943de3af 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -89,11 +89,6 @@ var ( } ) -type mockScheduleResult struct { - result ScheduleResult - err error -} - type fakeExtender struct { isBinder bool interestedPodName string @@ -670,95 +665,156 @@ func TestSchedulerScheduleOne(t *testing.T) { testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", UID: types.UID("node1")}} client := clientsetfake.NewClientset(&testNode) eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) - errS := errors.New("scheduler") - errB := errors.New("binder") + + scheduleResultOk := ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1} + emptyScheduleResult := ScheduleResult{} + fakeBinding := &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}} + + reserveErr := errors.New("reserve error") + schedulingErr := errors.New("scheduler") + permitErr := errors.New("permit error") preBindErr := errors.New("on PreBind") + bindingErr := errors.New("binder") + + testPod := podWithID("foo", "") + assignedTestPod := podWithID("foo", testNode.Name) table := []struct { - name string - injectBindError error - sendPod *v1.Pod - registerPluginFuncs []tf.RegisterPluginFunc - expectErrorPod *v1.Pod - expectForgetPod *v1.Pod - expectAssumedPod *v1.Pod - expectError error - expectBind *v1.Binding - eventReason string - mockResult mockScheduleResult + name string + sendPod *v1.Pod + registerPluginFuncs []tf.RegisterPluginFunc + injectBindError error + injectSchedulingError error + mockScheduleResult ScheduleResult + expectErrorPod *v1.Pod + expectForgetPod *v1.Pod + expectAssumedPod *v1.Pod + expectPodInBackoffQ *v1.Pod + expectPodInUnschedulable *v1.Pod + expectError error + expectBind *v1.Binding + eventReason string }{ { - name: "error reserve pod", - sendPod: podWithID("foo", ""), - mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil}, - registerPluginFuncs: []tf.RegisterPluginFunc{ - tf.RegisterReservePlugin("FakeReserve", tf.NewFakeReservePlugin(framework.NewStatus(framework.Error, "reserve error"))), - }, - expectErrorPod: podWithID("foo", testNode.Name), - expectForgetPod: podWithID("foo", testNode.Name), - expectAssumedPod: podWithID("foo", testNode.Name), - expectError: fmt.Errorf(`running Reserve plugin "FakeReserve": %w`, errors.New("reserve error")), - eventReason: "FailedScheduling", + name: "schedule pod failed", + sendPod: testPod, + injectSchedulingError: schedulingErr, + mockScheduleResult: scheduleResultOk, + expectError: schedulingErr, + expectErrorPod: testPod, + expectPodInBackoffQ: testPod, + eventReason: "FailedScheduling", }, { - name: "error permit pod", - sendPod: podWithID("foo", ""), - mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil}, + name: "reserve failed with status code error", + sendPod: testPod, registerPluginFuncs: []tf.RegisterPluginFunc{ - tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(framework.NewStatus(framework.Error, "permit error"), time.Minute)), + tf.RegisterReservePlugin("FakeReserve", tf.NewFakeReservePlugin(framework.AsStatus(reserveErr))), }, - expectErrorPod: podWithID("foo", testNode.Name), - expectForgetPod: podWithID("foo", testNode.Name), - expectAssumedPod: podWithID("foo", testNode.Name), - expectError: fmt.Errorf(`running Permit plugin "FakePermit": %w`, errors.New("permit error")), - eventReason: "FailedScheduling", + mockScheduleResult: scheduleResultOk, + expectErrorPod: assignedTestPod, + expectForgetPod: assignedTestPod, + expectAssumedPod: assignedTestPod, + expectPodInBackoffQ: testPod, + expectError: fmt.Errorf(`running Reserve plugin "FakeReserve": %w`, reserveErr), + eventReason: "FailedScheduling", }, { - name: "error prebind pod", - sendPod: podWithID("foo", ""), - mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil}, + name: "reserve failed with status code rejected", + sendPod: testPod, + registerPluginFuncs: []tf.RegisterPluginFunc{ + tf.RegisterReservePlugin("FakeReserve", tf.NewFakeReservePlugin(framework.NewStatus(framework.UnschedulableAndUnresolvable, "rejected on reserve"))), + }, + mockScheduleResult: scheduleResultOk, + expectErrorPod: assignedTestPod, + expectForgetPod: assignedTestPod, + expectAssumedPod: assignedTestPod, + expectPodInUnschedulable: testPod, + expectError: makePredicateError("1 rejected on reserve"), + eventReason: "FailedScheduling", + }, + { + name: "permit failed with status code error", + sendPod: testPod, + registerPluginFuncs: []tf.RegisterPluginFunc{ + tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(framework.AsStatus(permitErr), time.Minute)), + }, + mockScheduleResult: scheduleResultOk, + expectErrorPod: assignedTestPod, + expectForgetPod: assignedTestPod, + expectAssumedPod: assignedTestPod, + expectPodInBackoffQ: testPod, + expectError: fmt.Errorf(`running Permit plugin "FakePermit": %w`, permitErr), + eventReason: "FailedScheduling", + }, + { + name: "permit failed with status code rejected", + sendPod: testPod, + registerPluginFuncs: []tf.RegisterPluginFunc{ + tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(framework.NewStatus(framework.Unschedulable, "rejected on permit"), time.Minute)), + }, + mockScheduleResult: scheduleResultOk, + expectErrorPod: assignedTestPod, + expectForgetPod: assignedTestPod, + expectAssumedPod: assignedTestPod, + expectPodInUnschedulable: testPod, + expectError: makePredicateError("1 rejected on permit"), + eventReason: "FailedScheduling", + }, + { + name: "prebind failed with status code rejected", + sendPod: testPod, + registerPluginFuncs: []tf.RegisterPluginFunc{ + tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(framework.NewStatus(framework.Unschedulable, "rejected on prebind"))), + }, + mockScheduleResult: scheduleResultOk, + expectErrorPod: assignedTestPod, + expectForgetPod: assignedTestPod, + expectAssumedPod: assignedTestPod, + expectPodInBackoffQ: testPod, + expectError: fmt.Errorf("rejected on prebind"), + eventReason: "FailedScheduling", + }, + { + name: "prebind failed with status code error", + sendPod: testPod, registerPluginFuncs: []tf.RegisterPluginFunc{ tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(framework.AsStatus(preBindErr))), }, - expectErrorPod: podWithID("foo", testNode.Name), - expectForgetPod: podWithID("foo", testNode.Name), - expectAssumedPod: podWithID("foo", testNode.Name), - expectError: fmt.Errorf(`running PreBind plugin "FakePreBind": %w`, preBindErr), - eventReason: "FailedScheduling", + mockScheduleResult: scheduleResultOk, + expectErrorPod: assignedTestPod, + expectForgetPod: assignedTestPod, + expectAssumedPod: assignedTestPod, + expectPodInBackoffQ: testPod, + expectError: fmt.Errorf(`running PreBind plugin "FakePreBind": %w`, preBindErr), + eventReason: "FailedScheduling", }, { - name: "bind assumed pod scheduled", - sendPod: podWithID("foo", ""), - mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil}, - expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}}, - expectAssumedPod: podWithID("foo", testNode.Name), - eventReason: "Scheduled", + name: "binding failed", + sendPod: testPod, + injectBindError: bindingErr, + mockScheduleResult: scheduleResultOk, + expectBind: fakeBinding, + expectAssumedPod: assignedTestPod, + expectError: fmt.Errorf("running Bind plugin %q: %w", "DefaultBinder", bindingErr), + expectErrorPod: assignedTestPod, + expectForgetPod: assignedTestPod, + expectPodInBackoffQ: testPod, + eventReason: "FailedScheduling", }, { - name: "error pod failed scheduling", - sendPod: podWithID("foo", ""), - mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, errS}, - expectError: errS, - expectErrorPod: podWithID("foo", ""), - eventReason: "FailedScheduling", + name: "bind assumed pod scheduled", + sendPod: testPod, + mockScheduleResult: scheduleResultOk, + expectBind: fakeBinding, + expectAssumedPod: assignedTestPod, + eventReason: "Scheduled", }, { - name: "error bind forget pod failed scheduling", - sendPod: podWithID("foo", ""), - mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil}, - expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}}, - expectAssumedPod: podWithID("foo", testNode.Name), - injectBindError: errB, - expectError: fmt.Errorf("running Bind plugin %q: %w", "DefaultBinder", errors.New("binder")), - expectErrorPod: podWithID("foo", testNode.Name), - expectForgetPod: podWithID("foo", testNode.Name), - eventReason: "FailedScheduling", - }, - { - name: "deleting pod", - sendPod: deletingPod("foo"), - mockResult: mockScheduleResult{ScheduleResult{}, nil}, - eventReason: "FailedScheduling", + name: "deleting pod", + sendPod: deletingPod("foo"), + mockScheduleResult: emptyScheduleResult, + eventReason: "FailedScheduling", }, } @@ -794,6 +850,7 @@ func TestSchedulerScheduleOne(t *testing.T) { gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding) return true, gotBinding, item.injectBindError }) + informerFactory := informers.NewSharedInformerFactory(client, 0) fwk, err := tf.NewFramework(ctx, append(item.registerPluginFuncs, @@ -804,12 +861,12 @@ func TestSchedulerScheduleOne(t *testing.T) { frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)), frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()), + frameworkruntime.WithInformerFactory(informerFactory), ) if err != nil { t.Fatal(err) } - informerFactory := informers.NewSharedInformerFactory(client, 0) ar := metrics.NewMetricsAsyncRecorder(10, 1*time.Second, ctx.Done()) queue := internalqueue.NewSchedulingQueue(nil, informerFactory, internalqueue.WithMetricsRecorder(*ar)) sched := &Scheduler{ @@ -822,15 +879,13 @@ func TestSchedulerScheduleOne(t *testing.T) { queue.Add(logger, item.sendPod) sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) { - return item.mockResult.result, item.mockResult.err + return item.mockScheduleResult, item.injectSchedulingError } - sched.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *framework.Status, _ *framework.NominatingInfo, _ time.Time) { + sched.FailureHandler = func(ctx context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *framework.Status, ni *framework.NominatingInfo, start time.Time) { gotPod = p.Pod gotError = status.AsError() - msg := truncateMessage(gotError.Error()) - fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg) - queue.Done(p.Pod.UID) + sched.handleSchedulingFailure(ctx, fwk, p, status, ni, start) } called := make(chan struct{}) stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { @@ -843,6 +898,8 @@ func TestSchedulerScheduleOne(t *testing.T) { if err != nil { t.Fatal(err) } + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) sched.ScheduleOne(ctx) <-called if diff := cmp.Diff(item.expectAssumedPod, gotAssumedPod); diff != "" { @@ -864,6 +921,330 @@ func TestSchedulerScheduleOne(t *testing.T) { if diff := cmp.Diff(item.expectBind, gotBinding); diff != "" { t.Errorf("Unexpected binding (-want,+got):\n%s", diff) } + // We have to use wait here because the Pod goes to the binding cycle in some test cases + // and the inflight pods might not be empty immediately at this point in such case. + if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + return len(queue.InFlightPods()) == 0, nil + }); err != nil { + t.Errorf("in-flight pods should be always empty after SchedulingOne. It has %v Pods", len(queue.InFlightPods())) + } + podsInBackoffQ := queue.PodsInBackoffQ() + if item.expectPodInBackoffQ != nil { + if !podListContainsPod(podsInBackoffQ, item.expectPodInBackoffQ) { + t.Errorf("Expected to find pod in backoffQ, but it's not there.\nWant: %v,\ngot: %v", item.expectPodInBackoffQ, podsInBackoffQ) + } + } else { + if len(podsInBackoffQ) > 0 { + t.Errorf("Expected backoffQ to be empty, but it's not.\nGot: %v", podsInBackoffQ) + } + } + unschedulablePods := queue.UnschedulablePods() + if item.expectPodInUnschedulable != nil { + if !podListContainsPod(unschedulablePods, item.expectPodInUnschedulable) { + t.Errorf("Expected to find pod in unschedulable, but it's not there.\nWant: %v,\ngot: %v", item.expectPodInUnschedulable, unschedulablePods) + } + } else { + if len(unschedulablePods) > 0 { + t.Errorf("Expected unschedulable pods to be empty, but it's not.\nGot: %v", unschedulablePods) + } + } + stopFunc() + }) + } + } +} + +// Tests the logic removing pods from inFlightPods after Permit (needed to fix issue https://github.com/kubernetes/kubernetes/issues/129967). +// This needs to be a separate test case, because it mocks the waitOnPermit and runPrebindPlugins functions. +func TestScheduleOneMarksPodAsProcessedBeforePreBind(t *testing.T) { + testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", UID: types.UID("node1")}} + client := clientsetfake.NewClientset(&testNode) + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) + + scheduleResultOk := ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1} + emptyScheduleResult := ScheduleResult{} + bindingOk := &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}} + schedulingErr := errors.New("scheduler") + bindingErr := errors.New("binder") + preBindErr := errors.New("on PreBind") + permitErr := errors.New("permit") + waitOnPermitErr := errors.New("wait on permit") + + testPod := podWithID("foo", "") + assignedTestPod := podWithID("foo", testNode.Name) + + table := []struct { + name string + sendPod *v1.Pod + registerPluginFuncs []tf.RegisterPluginFunc + injectSchedulingError error + injectBindError error + mockScheduleResult ScheduleResult + mockWaitOnPermitResult *framework.Status + mockRunPreBindPluginsResult *framework.Status + expectErrorPod *v1.Pod + expectAssumedPod *v1.Pod + expectError error + expectBind *v1.Binding + eventReason string + expectPodIsInFlightAtFailureHandler bool + expectPodIsInFlightAtWaitOnPermit bool + }{ + { + name: "error on permit", + sendPod: testPod, + mockScheduleResult: scheduleResultOk, + registerPluginFuncs: []tf.RegisterPluginFunc{ + tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(framework.AsStatus(permitErr), time.Minute)), + }, + expectErrorPod: assignedTestPod, + expectAssumedPod: assignedTestPod, + expectError: fmt.Errorf(`running Permit plugin "FakePermit": %w`, permitErr), + eventReason: "FailedScheduling", + expectPodIsInFlightAtFailureHandler: true, + }, + { + name: "pod rejected on permit", + sendPod: testPod, + mockScheduleResult: scheduleResultOk, + registerPluginFuncs: []tf.RegisterPluginFunc{ + tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(framework.NewStatus(framework.Unschedulable, "on permit"), time.Minute)), + }, + expectErrorPod: assignedTestPod, + expectAssumedPod: assignedTestPod, + expectError: makePredicateError("1 on permit"), + eventReason: "FailedScheduling", + expectPodIsInFlightAtFailureHandler: true, + }, + { + name: "error on wait on permit", + sendPod: testPod, + mockScheduleResult: scheduleResultOk, + registerPluginFuncs: []tf.RegisterPluginFunc{ + tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(framework.NewStatus(framework.Wait), time.Minute)), + }, + mockWaitOnPermitResult: framework.AsStatus(waitOnPermitErr), + expectErrorPod: assignedTestPod, + expectAssumedPod: assignedTestPod, + expectError: waitOnPermitErr, + eventReason: "FailedScheduling", + expectPodIsInFlightAtFailureHandler: true, + expectPodIsInFlightAtWaitOnPermit: true, + }, + { + name: "pod rejected while wait on permit", + sendPod: testPod, + mockScheduleResult: scheduleResultOk, + registerPluginFuncs: []tf.RegisterPluginFunc{ + tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(framework.NewStatus(framework.Wait), time.Minute)), + }, + mockWaitOnPermitResult: framework.NewStatus(framework.Unschedulable, "wait on permit"), + expectErrorPod: assignedTestPod, + expectAssumedPod: assignedTestPod, + expectError: makePredicateError("1 wait on permit"), + eventReason: "FailedScheduling", + expectPodIsInFlightAtFailureHandler: true, + expectPodIsInFlightAtWaitOnPermit: true, + }, + { + name: "error prebind pod", + sendPod: testPod, + mockScheduleResult: scheduleResultOk, + registerPluginFuncs: []tf.RegisterPluginFunc{ + tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(framework.NewStatus(framework.Unschedulable))), + }, + mockWaitOnPermitResult: framework.NewStatus(framework.Success), + mockRunPreBindPluginsResult: framework.NewStatus(framework.Unschedulable, preBindErr.Error()), + expectErrorPod: assignedTestPod, + expectAssumedPod: assignedTestPod, + expectError: preBindErr, + eventReason: "FailedScheduling", + expectPodIsInFlightAtFailureHandler: false, + expectPodIsInFlightAtWaitOnPermit: true, + }, + { + name: "bind assumed pod scheduled", + sendPod: testPod, + mockScheduleResult: scheduleResultOk, + expectBind: bindingOk, + expectAssumedPod: assignedTestPod, + mockWaitOnPermitResult: framework.NewStatus(framework.Success), + mockRunPreBindPluginsResult: framework.NewStatus(framework.Success), + eventReason: "Scheduled", + expectPodIsInFlightAtFailureHandler: false, + expectPodIsInFlightAtWaitOnPermit: true, + }, + { + name: "error pod failed scheduling", + sendPod: testPod, + mockScheduleResult: scheduleResultOk, + injectSchedulingError: schedulingErr, + expectError: schedulingErr, + expectErrorPod: testPod, + eventReason: "FailedScheduling", + expectPodIsInFlightAtFailureHandler: true, + }, + { + name: "error bind forget pod failed scheduling", + sendPod: testPod, + mockScheduleResult: scheduleResultOk, + mockWaitOnPermitResult: framework.NewStatus(framework.Success), + mockRunPreBindPluginsResult: framework.NewStatus(framework.Success), + expectBind: bindingOk, + expectAssumedPod: assignedTestPod, + injectBindError: bindingErr, + expectError: fmt.Errorf("running Bind plugin %q: %w", "DefaultBinder", bindingErr), + expectErrorPod: assignedTestPod, + eventReason: "FailedScheduling", + expectPodIsInFlightAtFailureHandler: false, + expectPodIsInFlightAtWaitOnPermit: true, + }, + { + name: "deleting pod", + sendPod: deletingPod("foo"), + mockScheduleResult: emptyScheduleResult, + eventReason: "FailedScheduling", + expectPodIsInFlightAtFailureHandler: false, + expectPodIsInFlightAtWaitOnPermit: false, + }, + } + + for _, qHintEnabled := range []bool{true, false} { + for _, item := range table { + t.Run(fmt.Sprintf("[QueueingHint: %v] %s", qHintEnabled, item.name), func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, qHintEnabled) + logger, ctx := ktesting.NewTestContext(t) + var gotError error + var gotPod *v1.Pod + var gotAssumedPod *v1.Pod + var gotBinding *v1.Binding + var gotCallsToFailureHandler int + var gotPodIsInFlightAtFailureHandler bool + var gotPodIsInFlightAtWaitOnPermit bool + var gotPodIsInFlightAtRunPreBindPlugins bool + + cache := &fakecache.Cache{ + ForgetFunc: func(pod *v1.Pod) { + }, + AssumeFunc: func(pod *v1.Pod) { + gotAssumedPod = pod + }, + IsAssumedPodFunc: func(pod *v1.Pod) bool { + if pod == nil || gotAssumedPod == nil { + return false + } + return pod.UID == gotAssumedPod.UID + }, + } + client := clientsetfake.NewClientset(item.sendPod) + client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { + if action.GetSubresource() != "binding" { + return false, nil, nil + } + gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding) + return true, gotBinding, item.injectBindError + }) + + informerFactory := informers.NewSharedInformerFactory(client, 0) + ar := metrics.NewMetricsAsyncRecorder(10, 1*time.Second, ctx.Done()) + queue := internalqueue.NewSchedulingQueue(nil, informerFactory, internalqueue.WithMetricsRecorder(*ar)) + + fwk, err := NewFakeFramework( + ctx, + queue, + append(item.registerPluginFuncs, + tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + ), + testSchedulerName, + frameworkruntime.WithClientSet(client), + frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)), + frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()), + ) + if err != nil { + t.Fatal(err) + } + + fwk.waitOnPermitFn = func(_ context.Context, pod *v1.Pod) *framework.Status { + gotPodIsInFlightAtWaitOnPermit = podListContainsPod(fwk.queue.InFlightPods(), pod) + return item.mockWaitOnPermitResult + } + fwk.runPreBindPluginsFn = func(_ context.Context, _ *framework.CycleState, pod *v1.Pod, _ string) *framework.Status { + gotPodIsInFlightAtRunPreBindPlugins = podListContainsPod(fwk.queue.InFlightPods(), pod) + return item.mockRunPreBindPluginsResult + } + + sched := &Scheduler{ + Cache: cache, + client: client, + NextPod: queue.Pop, + SchedulingQueue: queue, + Profiles: profile.Map{testSchedulerName: fwk}, + } + queue.Add(logger, item.sendPod) + + sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) { + return item.mockScheduleResult, item.injectSchedulingError + } + sched.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *framework.Status, _ *framework.NominatingInfo, _ time.Time) { + gotCallsToFailureHandler++ + gotPodIsInFlightAtFailureHandler = podListContainsPod(queue.InFlightPods(), p.Pod) + + gotPod = p.Pod + gotError = status.AsError() + + msg := truncateMessage(gotError.Error()) + fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg) + + queue.Done(p.Pod.UID) + } + called := make(chan struct{}) + stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { + e, _ := obj.(*eventsv1.Event) + if e.Reason != item.eventReason { + t.Errorf("got event %v, want %v", e.Reason, item.eventReason) + } + close(called) + }) + if err != nil { + t.Fatal(err) + } + sched.ScheduleOne(ctx) + <-called + + if diff := cmp.Diff(item.expectAssumedPod, gotAssumedPod); diff != "" { + t.Errorf("Unexpected assumed pod (-want,+got):\n%s", diff) + } + if diff := cmp.Diff(item.expectErrorPod, gotPod); diff != "" { + t.Errorf("Unexpected error pod (-want,+got):\n%s", diff) + } + if item.expectError == nil || gotError == nil { + if !errors.Is(gotError, item.expectError) { + t.Errorf("Unexpected error. Wanted %v, got %v", item.expectError, gotError) + } + } else if item.expectError.Error() != gotError.Error() { + t.Errorf("Unexpected error. Wanted %v, got %v", item.expectError.Error(), gotError.Error()) + } + if diff := cmp.Diff(item.expectBind, gotBinding); diff != "" { + t.Errorf("Unexpected binding (-want,+got):\n%s", diff) + } + if item.expectError != nil && gotCallsToFailureHandler != 1 { + t.Errorf("expected 1 call to FailureHandlerFn, got %v", gotCallsToFailureHandler) + } + if item.expectError == nil && gotCallsToFailureHandler != 0 { + t.Errorf("expected 0 calls to FailureHandlerFn, got %v", gotCallsToFailureHandler) + } + if (item.expectPodIsInFlightAtFailureHandler && qHintEnabled) != gotPodIsInFlightAtFailureHandler { + t.Errorf("unexpected pod being in flight in FailureHandlerFn, expected %v but got %v.", + item.expectPodIsInFlightAtFailureHandler, gotPodIsInFlightAtFailureHandler) + } + if (item.expectPodIsInFlightAtWaitOnPermit && qHintEnabled) != gotPodIsInFlightAtWaitOnPermit { + t.Errorf("unexpected pod being in flight at start of WaitOnPermit, expected %v but got %v", + item.expectPodIsInFlightAtWaitOnPermit, gotPodIsInFlightAtWaitOnPermit) + } + if gotPodIsInFlightAtRunPreBindPlugins { + t.Errorf("unexpected pod being in flight at start of RunPreBindPlugins") + } // We have to use wait here // because the Pod goes to the binding cycle in some test cases and the inflight pods might not be empty immediately at this point in such case. if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { @@ -877,6 +1258,41 @@ func TestSchedulerScheduleOne(t *testing.T) { } } +// Fake Framework allows mocking calls to WaitOnPermit, RunPreBindPlugins and RunPostBindPlugins, to allow for +// simpler and more efficient testing of Scheduler's logic within the bindingCycle. +type FakeFramework struct { + framework.Framework + queue internalqueue.SchedulingQueue + waitOnPermitFn func(context.Context, *v1.Pod) *framework.Status + runPreBindPluginsFn func(context.Context, *framework.CycleState, *v1.Pod, string) *framework.Status +} + +func NewFakeFramework(ctx context.Context, schedQueue internalqueue.SchedulingQueue, fns []tf.RegisterPluginFunc, + profileName string, opts ...frameworkruntime.Option) (*FakeFramework, error) { + fwk, err := tf.NewFramework(ctx, fns, profileName, opts...) + return &FakeFramework{ + Framework: fwk, + queue: schedQueue}, + err +} + +func (ff *FakeFramework) WaitOnPermit(ctx context.Context, pod *v1.Pod) *framework.Status { + return ff.waitOnPermitFn(ctx, pod) +} + +func (ff *FakeFramework) RunPreBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { + return ff.runPreBindPluginsFn(ctx, state, pod, nodeName) +} + +func podListContainsPod(list []*v1.Pod, pod *v1.Pod) bool { + for _, p := range list { + if p.UID == pod.UID { + return true + } + } + return false +} + func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx)