From cf3f0bd7780987a406435dd93ca191ab208b5555 Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Sun, 6 Aug 2023 09:06:20 +0000 Subject: [PATCH] fix: register the plugin rejects Pods in WaitOnPermit to UnschedulablePlugins --- pkg/scheduler/framework/types.go | 1 + .../internal/queue/scheduling_queue.go | 2 +- pkg/scheduler/schedule_one.go | 11 ++ test/integration/scheduler/queue_test.go | 130 ++++++++++++++++++ 4 files changed, 143 insertions(+), 1 deletion(-) diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index 64b83eb33a3..fab03f024c3 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -177,6 +177,7 @@ type QueuedPodInfo struct { // latency for a pod. InitialAttemptTimestamp *time.Time // If a Pod failed in a scheduling cycle, record the plugin names it failed by. + // It's registered only when the Pod is rejected in PreFilter, Filter, Reserve, or Permit (WaitOnPermit). UnschedulablePlugins sets.Set[string] // Whether the Pod is scheduling gated (by PreEnqueuePlugins) or not. Gated bool diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index ba3c00edd21..657ce6481c0 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -731,7 +731,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo * // In this case, we try to requeue this Pod to activeQ/backoffQ. queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, ScheduleAttemptFailure) - logger.V(6).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", queue, "schedulingCycle", podSchedulingCycle) + logger.V(3).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", queue, "schedulingCycle", podSchedulingCycle, "hint", schedulingHint) if queue == activeQ { // When the Pod is moved to activeQ, need to let p.cond know so that the Pod will be pop()ed out. p.cond.Broadcast() diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index 525e1af8632..bc22ad41032 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -269,6 +269,17 @@ func (sched *Scheduler) bindingCycle( // Run "permit" plugins. if status := fwk.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() { + if status.IsUnschedulable() { + fitErr := &framework.FitError{ + NumAllNodes: 1, + Pod: assumedPodInfo.Pod, + Diagnosis: framework.Diagnosis{ + NodeToStatusMap: framework.NodeToStatusMap{scheduleResult.SuggestedHost: status}, + UnschedulablePlugins: sets.New(status.FailedPlugin()), + }, + } + return framework.NewStatus(status.Code()).WithError(fitErr) + } return status } diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go index 0fcea4c339c..24603f487f5 100644 --- a/test/integration/scheduler/queue_test.go +++ b/test/integration/scheduler/queue_test.go @@ -532,3 +532,133 @@ func (p *firstFailBindPlugin) Bind(ctx context.Context, state *framework.CycleSt return p.defaultBinderPlugin.Bind(ctx, state, pod, nodename) } + +// TestRequeueByPermitRejection verify Pods failed by permit plugins in the binding cycle are +// put back to the queue, according to the correct scheduling cycle number. +func TestRequeueByPermitRejection(t *testing.T) { + queueingHintCalledCounter := 0 + fakePermit := &fakePermitPlugin{} + registry := frameworkruntime.Registry{ + fakePermitPluginName: func(o runtime.Object, fh framework.Handle) (framework.Plugin, error) { + fakePermit = &fakePermitPlugin{ + frameworkHandler: fh, + schedulingHint: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { + queueingHintCalledCounter++ + return framework.QueueImmediately + }, + } + return fakePermit, nil + }, + } + cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{ + Profiles: []configv1.KubeSchedulerProfile{{ + SchedulerName: pointer.String(v1.DefaultSchedulerName), + Plugins: &configv1.Plugins{ + MultiPoint: configv1.PluginSet{ + Enabled: []configv1.Plugin{ + {Name: fakePermitPluginName}, + }, + }, + }, + }}}) + + // Use zero backoff seconds to bypass backoffQ. + testCtx := testutils.InitTestSchedulerWithOptions( + t, + testutils.InitTestAPIServer(t, "core-res-enqueue", nil), + 0, + scheduler.WithPodInitialBackoffSeconds(0), + scheduler.WithPodMaxBackoffSeconds(0), + scheduler.WithProfiles(cfg.Profiles...), + scheduler.WithFrameworkOutOfTreeRegistry(registry), + ) + testutils.SyncSchedulerInformerFactory(testCtx) + + go testCtx.Scheduler.Run(testCtx.Ctx) + + cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx + node := st.MakeNode().Name("fake-node").Obj() + if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create Node %q: %v", node.Name, err) + } + // create a pod. + pod := st.MakePod().Namespace(ns).Name("pod-1").Container(imageutils.GetPauseImageName()).Obj() + if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create Pod %q: %v", pod.Name, err) + } + + // update node label. (causes the NodeUpdate event) + node.Labels = map[string]string{"updated": ""} + if _, err := cs.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{}); err != nil { + t.Fatalf("Failed to add labels to the node: %v", err) + } + + // create a pod to increment the scheduling cycle number in the scheduling queue. + // We can make sure NodeUpdate event, that has happened in the previous scheduling cycle, makes Pod to be enqueued to activeQ via the scheduling queue. + pod = st.MakePod().Namespace(ns).Name("pod-2").Container(imageutils.GetPauseImageName()).Obj() + if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create Pod %q: %v", pod.Name, err) + } + + // reject pod-1 to simulate the failure in Permit plugins. + // This pod-1 should be enqueued to activeQ because the NodeUpdate event has happened. + fakePermit.frameworkHandler.IterateOverWaitingPods(func(wp framework.WaitingPod) { + if wp.GetPod().Name == "pod-1" { + wp.Reject(fakePermitPluginName, "fakePermitPlugin rejects the Pod") + return + } + }) + + // Wait for pod-2 to be scheduled. + err := wait.Poll(200*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) { + fakePermit.frameworkHandler.IterateOverWaitingPods(func(wp framework.WaitingPod) { + if wp.GetPod().Name == "pod-2" { + wp.Allow(fakePermitPluginName) + } + }) + + return testutils.PodScheduled(cs, ns, "pod-2")() + }) + if err != nil { + t.Fatalf("Expect pod-2 to be scheduled") + } + + err = wait.Poll(200*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) { + pod1Found := false + fakePermit.frameworkHandler.IterateOverWaitingPods(func(wp framework.WaitingPod) { + if wp.GetPod().Name == "pod-1" { + pod1Found = true + wp.Allow(fakePermitPluginName) + } + }) + return pod1Found, nil + }) + if err != nil { + t.Fatal("Expect pod-1 to be scheduled again") + } + + if queueingHintCalledCounter != 1 { + t.Fatalf("Expected the scheduling hint to be called 1 time, but %v", queueingHintCalledCounter) + } +} + +type fakePermitPlugin struct { + frameworkHandler framework.Handle + schedulingHint framework.QueueingHintFn +} + +const fakePermitPluginName = "fakePermitPlugin" + +func (p *fakePermitPlugin) Name() string { + return fakePermitPluginName +} + +func (p *fakePermitPlugin) Permit(ctx context.Context, state *framework.CycleState, _ *v1.Pod, _ string) (*framework.Status, time.Duration) { + return framework.NewStatus(framework.Wait), wait.ForeverTestTimeout +} + +func (p *fakePermitPlugin) EventsToRegister() []framework.ClusterEventWithHint { + return []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeLabel}, QueueingHintFn: p.schedulingHint}, + } +}