diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 5b9e4bdc79d..0948b9becbd 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -632,6 +632,13 @@ const ( // which improves the scheduling latency when the preemption involves in. SchedulerAsyncPreemption featuregate.Feature = "SchedulerAsyncPreemption" + // owner: @macsko + // kep: http://kep.k8s.io/5142 + // + // Improves scheduling queue behavior by popping pods from the backoffQ when the activeQ is empty. + // This allows to process potentially schedulable pods ASAP, eliminating a penalty effect of the backoff queue. + SchedulerPopFromBackoffQ featuregate.Feature = "SchedulerPopFromBackoffQ" + // owner: @atosatto @yuanchen8911 // kep: http://kep.k8s.io/3902 // diff --git a/pkg/features/versioned_kube_features.go b/pkg/features/versioned_kube_features.go index 7ea7c0e9fa5..892a446ca20 100644 --- a/pkg/features/versioned_kube_features.go +++ b/pkg/features/versioned_kube_features.go @@ -660,6 +660,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha}, }, + SchedulerPopFromBackoffQ: { + {Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta}, + }, + SchedulerQueueingHints: { {Version: version.MustParse("1.28"), Default: false, PreRelease: featuregate.Beta}, {Version: version.MustParse("1.32"), Default: true, PreRelease: featuregate.Beta}, diff --git a/pkg/scheduler/backend/queue/scheduling_queue.go b/pkg/scheduler/backend/queue/scheduling_queue.go index 3b85a296bce..9e3ae718579 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue.go +++ b/pkg/scheduler/backend/queue/scheduling_queue.go @@ -189,6 +189,8 @@ type PriorityQueue struct { // isSchedulingQueueHintEnabled indicates whether the feature gate for the scheduling queue is enabled. isSchedulingQueueHintEnabled bool + // isPopFromBackoffQEnabled indicates whether the feature gate SchedulerPopFromBackoffQ is enabled. + isPopFromBackoffQEnabled bool } // QueueingHintFunction is the wrapper of QueueingHintFn that has PluginName. @@ -325,6 +327,7 @@ func NewPriorityQueue( } isSchedulingQueueHintEnabled := utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) + isPopFromBackoffQEnabled := utilfeature.DefaultFeatureGate.Enabled(features.SchedulerPopFromBackoffQ) pq := &PriorityQueue{ clock: options.clock, @@ -339,6 +342,7 @@ func NewPriorityQueue( pluginMetricsSamplePercent: options.pluginMetricsSamplePercent, moveRequestCycle: -1, isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled, + isPopFromBackoffQEnabled: isPopFromBackoffQEnabled, } pq.nsLister = informerFactory.Core().V1().Namespaces().Lister() pq.nominator = newPodNominator(options.podLister) @@ -545,13 +549,17 @@ func (p *PriorityQueue) runPreEnqueuePlugin(ctx context.Context, pl framework.Pr return s } -// moveToActiveQ tries to add pod to active queue and remove it from unschedulable and backoff queues. -// It returns 2 parameters: -// 1. a boolean flag to indicate whether the pod is added successfully. -// 2. an error for the caller to act on. +// moveToActiveQ tries to add the pod to the active queue. +// If the pod doesn't pass PreEnqueue plugins, it gets added to unschedulablePods instead. +// It returns a boolean flag to indicate whether the pod is added successfully. func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) bool { gatedBefore := pInfo.Gated - pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo) + // If SchedulerPopFromBackoffQ feature gate is enabled, + // PreEnqueue plugins were called when the pod was added to the backoffQ. + // Don't need to repeat it here when the pod is directly moved from the backoffQ. + if !p.isPopFromBackoffQEnabled || event != framework.BackoffComplete { + pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo) + } added := false p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { @@ -588,6 +596,28 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue return added } +// moveToBackoffQ tries to add the pod to the backoff queue. +// If SchedulerPopFromBackoffQ feature gate is enabled and the pod doesn't pass PreEnqueue plugins, it gets added to unschedulablePods instead. +// It returns a boolean flag to indicate whether the pod is added successfully. +func (p *PriorityQueue) moveToBackoffQ(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) bool { + // If SchedulerPopFromBackoffQ feature gate is enabled, + // PreEnqueue plugins are called on inserting pods to the backoffQ, + // not to call them again on popping out. + if p.isPopFromBackoffQEnabled { + pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo) + if pInfo.Gated { + if p.unschedulablePods.get(pInfo.Pod) == nil { + p.unschedulablePods.addOrUpdate(pInfo, event) + logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", unschedulablePods) + } + return false + } + } + p.backoffQ.add(logger, pInfo, event) + logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQ) + return true +} + // Add adds a pod to the active queue. It should be called only when a new pod // is added so there is no chance the pod is already in active/unschedulable/backoff queues func (p *PriorityQueue) Add(logger klog.Logger, pod *v1.Pod) { @@ -724,8 +754,7 @@ func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger, // - No unschedulable plugins are associated with this Pod, // meaning something unusual (a temporal failure on kube-apiserver, etc) happened and this Pod gets moved back to the queue. // In this case, we should retry scheduling it because this Pod may not be retried until the next flush. - p.backoffQ.add(logger, pInfo, framework.ScheduleAttemptFailure) - logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", backoffQ) + _ = p.moveToBackoffQ(logger, pInfo, framework.ScheduleAttemptFailure) } else { p.unschedulablePods.addOrUpdate(pInfo, framework.ScheduleAttemptFailure) logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", unschedulablePods) @@ -934,13 +963,13 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) { // Pod might have completed its backoff time while being in unschedulablePods, // so we should check isPodBackingoff before moving the pod to backoffQ. if p.backoffQ.isPodBackingoff(pInfo) { - p.backoffQ.add(logger, pInfo, framework.EventUnscheduledPodUpdate.Label()) - p.unschedulablePods.delete(pInfo.Pod, gated) - logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", framework.EventUnscheduledPodUpdate.Label(), "queue", backoffQ) + if added := p.moveToBackoffQ(logger, pInfo, framework.EventUnscheduledPodUpdate.Label()); added { + p.unschedulablePods.delete(pInfo.Pod, gated) + } return } - if added := p.moveToActiveQ(logger, pInfo, framework.BackoffComplete); added { + if added := p.moveToActiveQ(logger, pInfo, framework.EventUnscheduledPodUpdate.Label()); added { p.activeQ.broadcast() } return @@ -1044,8 +1073,10 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra // Pod might have completed its backoff time while being in unschedulablePods, // so we should check isPodBackingoff before moving the pod to backoffQ. if strategy == queueAfterBackoff && p.backoffQ.isPodBackingoff(pInfo) { - p.backoffQ.add(logger, pInfo, event) - return backoffQ + if added := p.moveToBackoffQ(logger, pInfo, event); added { + return backoffQ + } + return unschedulablePods } // Reach here if schedulingHint is QueueImmediately, or schedulingHint is Queue but the pod is not backing off. diff --git a/pkg/scheduler/backend/queue/scheduling_queue_test.go b/pkg/scheduler/backend/queue/scheduling_queue_test.go index f779b022eb3..c85fad42d34 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue_test.go +++ b/pkg/scheduler/backend/queue/scheduling_queue_test.go @@ -1443,17 +1443,20 @@ func (pl *preEnqueuePlugin) PreEnqueue(ctx context.Context, p *v1.Pod) *framewor return framework.NewStatus(framework.UnschedulableAndUnresolvable, "pod name not in allowlists") } -func TestPriorityQueue_addToActiveQ(t *testing.T) { +func TestPriorityQueue_moveToActiveQ(t *testing.T) { tests := []struct { - name string - plugins []framework.PreEnqueuePlugin - pod *v1.Pod - wantUnschedulablePods int - wantSuccess bool + name string + plugins []framework.PreEnqueuePlugin + pod *v1.Pod + event string + popFromBackoffQEnabled []bool + wantUnschedulablePods int + wantSuccess bool }{ { name: "no plugins registered", pod: st.MakePod().Name("p").Label("p", "").Obj(), + event: framework.EventUnscheduledPodAdd.Label(), wantUnschedulablePods: 0, wantSuccess: true, }, @@ -1461,6 +1464,7 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) { name: "preEnqueue plugin registered, pod name not in allowlists", plugins: []framework.PreEnqueuePlugin{&preEnqueuePlugin{}, &preEnqueuePlugin{}}, pod: st.MakePod().Name("p").Label("p", "").Obj(), + event: framework.EventUnscheduledPodAdd.Label(), wantUnschedulablePods: 1, wantSuccess: false, }, @@ -1471,9 +1475,36 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) { &preEnqueuePlugin{allowlists: []string{"foo"}}, }, pod: st.MakePod().Name("bar").Label("bar", "").Obj(), + event: framework.EventUnscheduledPodAdd.Label(), wantUnschedulablePods: 1, wantSuccess: false, }, + { + // With SchedulerPopFromBackoffQ enabled, the queue assumes the pod has already passed PreEnqueue, + // and it doesn't run PreEnqueue again, always puts the pod to activeQ. + name: "preEnqueue plugin registered, preEnqueue plugin would reject the pod, but isn't run", + plugins: []framework.PreEnqueuePlugin{ + &preEnqueuePlugin{allowlists: []string{"foo", "bar"}}, + &preEnqueuePlugin{allowlists: []string{"foo"}}, + }, + pod: st.MakePod().Name("bar").Label("bar", "").Obj(), + event: framework.BackoffComplete, + popFromBackoffQEnabled: []bool{false}, + wantUnschedulablePods: 1, + wantSuccess: false, + }, + { + name: "preEnqueue plugin registered, pod would fail one preEnqueue plugin, but is after backoff", + plugins: []framework.PreEnqueuePlugin{ + &preEnqueuePlugin{allowlists: []string{"foo", "bar"}}, + &preEnqueuePlugin{allowlists: []string{"foo"}}, + }, + pod: st.MakePod().Name("bar").Label("bar", "").Obj(), + event: framework.BackoffComplete, + popFromBackoffQEnabled: []bool{true}, + wantUnschedulablePods: 0, + wantSuccess: true, + }, { name: "preEnqueue plugin registered, pod passed all preEnqueue plugins", plugins: []framework.PreEnqueuePlugin{ @@ -1481,37 +1512,141 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) { &preEnqueuePlugin{allowlists: []string{"bar"}}, }, pod: st.MakePod().Name("bar").Label("bar", "").Obj(), + event: framework.EventUnscheduledPodAdd.Label(), wantUnschedulablePods: 0, wantSuccess: true, }, } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - logger, ctx := ktesting.NewTestContext(t) - ctx, cancel := context.WithCancel(ctx) - defer cancel() + if tt.popFromBackoffQEnabled == nil { + tt.popFromBackoffQEnabled = []bool{true, false} + } + for _, popFromBackoffQEnabled := range tt.popFromBackoffQEnabled { + t.Run(fmt.Sprintf("%s popFromBackoffQEnabled(%v)", tt.name, popFromBackoffQEnabled), func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerPopFromBackoffQ, popFromBackoffQEnabled) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() - m := map[string][]framework.PreEnqueuePlugin{"": tt.plugins} - q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m), - WithPodInitialBackoffDuration(time.Second*30), WithPodMaxBackoffDuration(time.Second*60)) - got := q.moveToActiveQ(logger, q.newQueuedPodInfo(tt.pod), framework.EventUnscheduledPodAdd.Label()) - if got != tt.wantSuccess { - t.Errorf("Unexpected result: want %v, but got %v", tt.wantSuccess, got) - } - if tt.wantUnschedulablePods != len(q.unschedulablePods.podInfoMap) { - t.Errorf("Unexpected unschedulablePods: want %v, but got %v", tt.wantUnschedulablePods, len(q.unschedulablePods.podInfoMap)) - } + m := map[string][]framework.PreEnqueuePlugin{"": tt.plugins} + q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m), + WithPodInitialBackoffDuration(time.Second*30), WithPodMaxBackoffDuration(time.Second*60)) + got := q.moveToActiveQ(logger, q.newQueuedPodInfo(tt.pod), tt.event) + if got != tt.wantSuccess { + t.Errorf("Unexpected result: want %v, but got %v", tt.wantSuccess, got) + } + if tt.wantUnschedulablePods != len(q.unschedulablePods.podInfoMap) { + t.Errorf("Unexpected unschedulablePods: want %v, but got %v", tt.wantUnschedulablePods, len(q.unschedulablePods.podInfoMap)) + } - // Simulate an update event. - clone := tt.pod.DeepCopy() - metav1.SetMetaDataAnnotation(&clone.ObjectMeta, "foo", "") - q.Update(logger, tt.pod, clone) - // Ensure the pod is still located in unschedulablePods. - if tt.wantUnschedulablePods != len(q.unschedulablePods.podInfoMap) { - t.Errorf("Unexpected unschedulablePods: want %v, but got %v", tt.wantUnschedulablePods, len(q.unschedulablePods.podInfoMap)) - } - }) + // Simulate an update event. + clone := tt.pod.DeepCopy() + metav1.SetMetaDataAnnotation(&clone.ObjectMeta, "foo", "") + q.Update(logger, tt.pod, clone) + // Ensure the pod is still located in unschedulablePods. + if tt.wantUnschedulablePods != len(q.unschedulablePods.podInfoMap) { + t.Errorf("Unexpected unschedulablePods: want %v, but got %v", tt.wantUnschedulablePods, len(q.unschedulablePods.podInfoMap)) + } + }) + } + } +} + +func TestPriorityQueue_moveToBackoffQ(t *testing.T) { + tests := []struct { + name string + plugins []framework.PreEnqueuePlugin + pod *v1.Pod + popFromBackoffQEnabled []bool + wantSuccess bool + }{ + { + name: "no plugins registered", + pod: st.MakePod().Name("p").Label("p", "").Obj(), + wantSuccess: true, + }, + { + name: "preEnqueue plugin registered, pod name would not be in allowlists", + plugins: []framework.PreEnqueuePlugin{&preEnqueuePlugin{}, &preEnqueuePlugin{}}, + pod: st.MakePod().Name("p").Label("p", "").Obj(), + popFromBackoffQEnabled: []bool{false}, + wantSuccess: true, + }, + { + name: "preEnqueue plugin registered, pod name not in allowlists", + plugins: []framework.PreEnqueuePlugin{&preEnqueuePlugin{}, &preEnqueuePlugin{}}, + pod: st.MakePod().Name("p").Label("p", "").Obj(), + popFromBackoffQEnabled: []bool{true}, + wantSuccess: false, + }, + { + name: "preEnqueue plugin registered, preEnqueue plugin would reject the pod, but isn't run", + plugins: []framework.PreEnqueuePlugin{ + &preEnqueuePlugin{allowlists: []string{"foo", "bar"}}, + &preEnqueuePlugin{allowlists: []string{"foo"}}, + }, + pod: st.MakePod().Name("bar").Label("bar", "").Obj(), + popFromBackoffQEnabled: []bool{false}, + wantSuccess: true, + }, + { + name: "preEnqueue plugin registered, pod failed one preEnqueue plugin", + plugins: []framework.PreEnqueuePlugin{ + &preEnqueuePlugin{allowlists: []string{"foo", "bar"}}, + &preEnqueuePlugin{allowlists: []string{"foo"}}, + }, + pod: st.MakePod().Name("bar").Label("bar", "").Obj(), + popFromBackoffQEnabled: []bool{true}, + wantSuccess: false, + }, + { + name: "preEnqueue plugin registered, pod passed all preEnqueue plugins", + plugins: []framework.PreEnqueuePlugin{ + &preEnqueuePlugin{allowlists: []string{"foo", "bar"}}, + &preEnqueuePlugin{allowlists: []string{"bar"}}, + }, + pod: st.MakePod().Name("bar").Label("bar", "").Obj(), + wantSuccess: true, + }, + } + + for _, tt := range tests { + if tt.popFromBackoffQEnabled == nil { + tt.popFromBackoffQEnabled = []bool{true, false} + } + for _, popFromBackoffQEnabled := range tt.popFromBackoffQEnabled { + t.Run(fmt.Sprintf("%s popFromBackoffQEnabled(%v)", tt.name, popFromBackoffQEnabled), func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerPopFromBackoffQ, popFromBackoffQEnabled) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + m := map[string][]framework.PreEnqueuePlugin{"": tt.plugins} + q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m), + WithPodInitialBackoffDuration(time.Second*30), WithPodMaxBackoffDuration(time.Second*60)) + pInfo := q.newQueuedPodInfo(tt.pod) + got := q.moveToBackoffQ(logger, pInfo, framework.EventUnscheduledPodAdd.Label()) + if got != tt.wantSuccess { + t.Errorf("Unexpected result: want %v, but got %v", tt.wantSuccess, got) + } + if tt.wantSuccess { + if !q.backoffQ.has(pInfo) { + t.Errorf("Expected pod to be in backoffQ, but it isn't") + } + if q.unschedulablePods.get(pInfo.Pod) != nil { + t.Errorf("Expected pod not to be in unschedulablePods, but it is") + } + } else { + if q.backoffQ.has(pInfo) { + t.Errorf("Expected pod not to be in backoffQ, but it is") + } + if q.unschedulablePods.get(pInfo.Pod) == nil { + t.Errorf("Expected pod to be in unschedulablePods, but it isn't") + } + } + }) + } } } diff --git a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml index 0c9e20cc924..ee51176a235 100644 --- a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml +++ b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml @@ -1149,6 +1149,12 @@ lockToDefault: false preRelease: Alpha version: "1.32" +- name: SchedulerPopFromBackoffQ + versionedSpecs: + - default: true + lockToDefault: false + preRelease: Beta + version: "1.33" - name: SchedulerQueueingHints versionedSpecs: - default: false