Merge pull request #130492 from macsko/call_preenqueue_plugins_when_adding_pod_to_backoffq

Call PreEnqueue plugins before adding pod to backoffQ
This commit is contained in:
Kubernetes Prow Robot 2025-03-14 08:55:53 -07:00 committed by GitHub
commit 0446f6c146
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 225 additions and 42 deletions

View File

@ -632,6 +632,13 @@ const (
// which improves the scheduling latency when the preemption involves in.
SchedulerAsyncPreemption featuregate.Feature = "SchedulerAsyncPreemption"
// owner: @macsko
// kep: http://kep.k8s.io/5142
//
// Improves scheduling queue behavior by popping pods from the backoffQ when the activeQ is empty.
// This allows to process potentially schedulable pods ASAP, eliminating a penalty effect of the backoff queue.
SchedulerPopFromBackoffQ featuregate.Feature = "SchedulerPopFromBackoffQ"
// owner: @atosatto @yuanchen8911
// kep: http://kep.k8s.io/3902
//

View File

@ -660,6 +660,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha},
},
SchedulerPopFromBackoffQ: {
{Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta},
},
SchedulerQueueingHints: {
{Version: version.MustParse("1.28"), Default: false, PreRelease: featuregate.Beta},
{Version: version.MustParse("1.32"), Default: true, PreRelease: featuregate.Beta},

View File

@ -189,6 +189,8 @@ type PriorityQueue struct {
// isSchedulingQueueHintEnabled indicates whether the feature gate for the scheduling queue is enabled.
isSchedulingQueueHintEnabled bool
// isPopFromBackoffQEnabled indicates whether the feature gate SchedulerPopFromBackoffQ is enabled.
isPopFromBackoffQEnabled bool
}
// QueueingHintFunction is the wrapper of QueueingHintFn that has PluginName.
@ -325,6 +327,7 @@ func NewPriorityQueue(
}
isSchedulingQueueHintEnabled := utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints)
isPopFromBackoffQEnabled := utilfeature.DefaultFeatureGate.Enabled(features.SchedulerPopFromBackoffQ)
pq := &PriorityQueue{
clock: options.clock,
@ -339,6 +342,7 @@ func NewPriorityQueue(
pluginMetricsSamplePercent: options.pluginMetricsSamplePercent,
moveRequestCycle: -1,
isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled,
isPopFromBackoffQEnabled: isPopFromBackoffQEnabled,
}
pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()
pq.nominator = newPodNominator(options.podLister)
@ -545,13 +549,17 @@ func (p *PriorityQueue) runPreEnqueuePlugin(ctx context.Context, pl framework.Pr
return s
}
// moveToActiveQ tries to add pod to active queue and remove it from unschedulable and backoff queues.
// It returns 2 parameters:
// 1. a boolean flag to indicate whether the pod is added successfully.
// 2. an error for the caller to act on.
// moveToActiveQ tries to add the pod to the active queue.
// If the pod doesn't pass PreEnqueue plugins, it gets added to unschedulablePods instead.
// It returns a boolean flag to indicate whether the pod is added successfully.
func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) bool {
gatedBefore := pInfo.Gated
pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo)
// If SchedulerPopFromBackoffQ feature gate is enabled,
// PreEnqueue plugins were called when the pod was added to the backoffQ.
// Don't need to repeat it here when the pod is directly moved from the backoffQ.
if !p.isPopFromBackoffQEnabled || event != framework.BackoffComplete {
pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo)
}
added := false
p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
@ -588,6 +596,28 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue
return added
}
// moveToBackoffQ tries to add the pod to the backoff queue.
// If SchedulerPopFromBackoffQ feature gate is enabled and the pod doesn't pass PreEnqueue plugins, it gets added to unschedulablePods instead.
// It returns a boolean flag to indicate whether the pod is added successfully.
func (p *PriorityQueue) moveToBackoffQ(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) bool {
// If SchedulerPopFromBackoffQ feature gate is enabled,
// PreEnqueue plugins are called on inserting pods to the backoffQ,
// not to call them again on popping out.
if p.isPopFromBackoffQEnabled {
pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo)
if pInfo.Gated {
if p.unschedulablePods.get(pInfo.Pod) == nil {
p.unschedulablePods.addOrUpdate(pInfo, event)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", unschedulablePods)
}
return false
}
}
p.backoffQ.add(logger, pInfo, event)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQ)
return true
}
// Add adds a pod to the active queue. It should be called only when a new pod
// is added so there is no chance the pod is already in active/unschedulable/backoff queues
func (p *PriorityQueue) Add(logger klog.Logger, pod *v1.Pod) {
@ -724,8 +754,7 @@ func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger,
// - No unschedulable plugins are associated with this Pod,
// meaning something unusual (a temporal failure on kube-apiserver, etc) happened and this Pod gets moved back to the queue.
// In this case, we should retry scheduling it because this Pod may not be retried until the next flush.
p.backoffQ.add(logger, pInfo, framework.ScheduleAttemptFailure)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", backoffQ)
_ = p.moveToBackoffQ(logger, pInfo, framework.ScheduleAttemptFailure)
} else {
p.unschedulablePods.addOrUpdate(pInfo, framework.ScheduleAttemptFailure)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", unschedulablePods)
@ -934,13 +963,13 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
// Pod might have completed its backoff time while being in unschedulablePods,
// so we should check isPodBackingoff before moving the pod to backoffQ.
if p.backoffQ.isPodBackingoff(pInfo) {
p.backoffQ.add(logger, pInfo, framework.EventUnscheduledPodUpdate.Label())
p.unschedulablePods.delete(pInfo.Pod, gated)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", framework.EventUnscheduledPodUpdate.Label(), "queue", backoffQ)
if added := p.moveToBackoffQ(logger, pInfo, framework.EventUnscheduledPodUpdate.Label()); added {
p.unschedulablePods.delete(pInfo.Pod, gated)
}
return
}
if added := p.moveToActiveQ(logger, pInfo, framework.BackoffComplete); added {
if added := p.moveToActiveQ(logger, pInfo, framework.EventUnscheduledPodUpdate.Label()); added {
p.activeQ.broadcast()
}
return
@ -1044,8 +1073,10 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra
// Pod might have completed its backoff time while being in unschedulablePods,
// so we should check isPodBackingoff before moving the pod to backoffQ.
if strategy == queueAfterBackoff && p.backoffQ.isPodBackingoff(pInfo) {
p.backoffQ.add(logger, pInfo, event)
return backoffQ
if added := p.moveToBackoffQ(logger, pInfo, event); added {
return backoffQ
}
return unschedulablePods
}
// Reach here if schedulingHint is QueueImmediately, or schedulingHint is Queue but the pod is not backing off.

View File

@ -1443,17 +1443,20 @@ func (pl *preEnqueuePlugin) PreEnqueue(ctx context.Context, p *v1.Pod) *framewor
return framework.NewStatus(framework.UnschedulableAndUnresolvable, "pod name not in allowlists")
}
func TestPriorityQueue_addToActiveQ(t *testing.T) {
func TestPriorityQueue_moveToActiveQ(t *testing.T) {
tests := []struct {
name string
plugins []framework.PreEnqueuePlugin
pod *v1.Pod
wantUnschedulablePods int
wantSuccess bool
name string
plugins []framework.PreEnqueuePlugin
pod *v1.Pod
event string
popFromBackoffQEnabled []bool
wantUnschedulablePods int
wantSuccess bool
}{
{
name: "no plugins registered",
pod: st.MakePod().Name("p").Label("p", "").Obj(),
event: framework.EventUnscheduledPodAdd.Label(),
wantUnschedulablePods: 0,
wantSuccess: true,
},
@ -1461,6 +1464,7 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) {
name: "preEnqueue plugin registered, pod name not in allowlists",
plugins: []framework.PreEnqueuePlugin{&preEnqueuePlugin{}, &preEnqueuePlugin{}},
pod: st.MakePod().Name("p").Label("p", "").Obj(),
event: framework.EventUnscheduledPodAdd.Label(),
wantUnschedulablePods: 1,
wantSuccess: false,
},
@ -1471,9 +1475,36 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) {
&preEnqueuePlugin{allowlists: []string{"foo"}},
},
pod: st.MakePod().Name("bar").Label("bar", "").Obj(),
event: framework.EventUnscheduledPodAdd.Label(),
wantUnschedulablePods: 1,
wantSuccess: false,
},
{
// With SchedulerPopFromBackoffQ enabled, the queue assumes the pod has already passed PreEnqueue,
// and it doesn't run PreEnqueue again, always puts the pod to activeQ.
name: "preEnqueue plugin registered, preEnqueue plugin would reject the pod, but isn't run",
plugins: []framework.PreEnqueuePlugin{
&preEnqueuePlugin{allowlists: []string{"foo", "bar"}},
&preEnqueuePlugin{allowlists: []string{"foo"}},
},
pod: st.MakePod().Name("bar").Label("bar", "").Obj(),
event: framework.BackoffComplete,
popFromBackoffQEnabled: []bool{false},
wantUnschedulablePods: 1,
wantSuccess: false,
},
{
name: "preEnqueue plugin registered, pod would fail one preEnqueue plugin, but is after backoff",
plugins: []framework.PreEnqueuePlugin{
&preEnqueuePlugin{allowlists: []string{"foo", "bar"}},
&preEnqueuePlugin{allowlists: []string{"foo"}},
},
pod: st.MakePod().Name("bar").Label("bar", "").Obj(),
event: framework.BackoffComplete,
popFromBackoffQEnabled: []bool{true},
wantUnschedulablePods: 0,
wantSuccess: true,
},
{
name: "preEnqueue plugin registered, pod passed all preEnqueue plugins",
plugins: []framework.PreEnqueuePlugin{
@ -1481,37 +1512,141 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) {
&preEnqueuePlugin{allowlists: []string{"bar"}},
},
pod: st.MakePod().Name("bar").Label("bar", "").Obj(),
event: framework.EventUnscheduledPodAdd.Label(),
wantUnschedulablePods: 0,
wantSuccess: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if tt.popFromBackoffQEnabled == nil {
tt.popFromBackoffQEnabled = []bool{true, false}
}
for _, popFromBackoffQEnabled := range tt.popFromBackoffQEnabled {
t.Run(fmt.Sprintf("%s popFromBackoffQEnabled(%v)", tt.name, popFromBackoffQEnabled), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerPopFromBackoffQ, popFromBackoffQEnabled)
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
m := map[string][]framework.PreEnqueuePlugin{"": tt.plugins}
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m),
WithPodInitialBackoffDuration(time.Second*30), WithPodMaxBackoffDuration(time.Second*60))
got := q.moveToActiveQ(logger, q.newQueuedPodInfo(tt.pod), framework.EventUnscheduledPodAdd.Label())
if got != tt.wantSuccess {
t.Errorf("Unexpected result: want %v, but got %v", tt.wantSuccess, got)
}
if tt.wantUnschedulablePods != len(q.unschedulablePods.podInfoMap) {
t.Errorf("Unexpected unschedulablePods: want %v, but got %v", tt.wantUnschedulablePods, len(q.unschedulablePods.podInfoMap))
}
m := map[string][]framework.PreEnqueuePlugin{"": tt.plugins}
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m),
WithPodInitialBackoffDuration(time.Second*30), WithPodMaxBackoffDuration(time.Second*60))
got := q.moveToActiveQ(logger, q.newQueuedPodInfo(tt.pod), tt.event)
if got != tt.wantSuccess {
t.Errorf("Unexpected result: want %v, but got %v", tt.wantSuccess, got)
}
if tt.wantUnschedulablePods != len(q.unschedulablePods.podInfoMap) {
t.Errorf("Unexpected unschedulablePods: want %v, but got %v", tt.wantUnschedulablePods, len(q.unschedulablePods.podInfoMap))
}
// Simulate an update event.
clone := tt.pod.DeepCopy()
metav1.SetMetaDataAnnotation(&clone.ObjectMeta, "foo", "")
q.Update(logger, tt.pod, clone)
// Ensure the pod is still located in unschedulablePods.
if tt.wantUnschedulablePods != len(q.unschedulablePods.podInfoMap) {
t.Errorf("Unexpected unschedulablePods: want %v, but got %v", tt.wantUnschedulablePods, len(q.unschedulablePods.podInfoMap))
}
})
// Simulate an update event.
clone := tt.pod.DeepCopy()
metav1.SetMetaDataAnnotation(&clone.ObjectMeta, "foo", "")
q.Update(logger, tt.pod, clone)
// Ensure the pod is still located in unschedulablePods.
if tt.wantUnschedulablePods != len(q.unschedulablePods.podInfoMap) {
t.Errorf("Unexpected unschedulablePods: want %v, but got %v", tt.wantUnschedulablePods, len(q.unschedulablePods.podInfoMap))
}
})
}
}
}
func TestPriorityQueue_moveToBackoffQ(t *testing.T) {
tests := []struct {
name string
plugins []framework.PreEnqueuePlugin
pod *v1.Pod
popFromBackoffQEnabled []bool
wantSuccess bool
}{
{
name: "no plugins registered",
pod: st.MakePod().Name("p").Label("p", "").Obj(),
wantSuccess: true,
},
{
name: "preEnqueue plugin registered, pod name would not be in allowlists",
plugins: []framework.PreEnqueuePlugin{&preEnqueuePlugin{}, &preEnqueuePlugin{}},
pod: st.MakePod().Name("p").Label("p", "").Obj(),
popFromBackoffQEnabled: []bool{false},
wantSuccess: true,
},
{
name: "preEnqueue plugin registered, pod name not in allowlists",
plugins: []framework.PreEnqueuePlugin{&preEnqueuePlugin{}, &preEnqueuePlugin{}},
pod: st.MakePod().Name("p").Label("p", "").Obj(),
popFromBackoffQEnabled: []bool{true},
wantSuccess: false,
},
{
name: "preEnqueue plugin registered, preEnqueue plugin would reject the pod, but isn't run",
plugins: []framework.PreEnqueuePlugin{
&preEnqueuePlugin{allowlists: []string{"foo", "bar"}},
&preEnqueuePlugin{allowlists: []string{"foo"}},
},
pod: st.MakePod().Name("bar").Label("bar", "").Obj(),
popFromBackoffQEnabled: []bool{false},
wantSuccess: true,
},
{
name: "preEnqueue plugin registered, pod failed one preEnqueue plugin",
plugins: []framework.PreEnqueuePlugin{
&preEnqueuePlugin{allowlists: []string{"foo", "bar"}},
&preEnqueuePlugin{allowlists: []string{"foo"}},
},
pod: st.MakePod().Name("bar").Label("bar", "").Obj(),
popFromBackoffQEnabled: []bool{true},
wantSuccess: false,
},
{
name: "preEnqueue plugin registered, pod passed all preEnqueue plugins",
plugins: []framework.PreEnqueuePlugin{
&preEnqueuePlugin{allowlists: []string{"foo", "bar"}},
&preEnqueuePlugin{allowlists: []string{"bar"}},
},
pod: st.MakePod().Name("bar").Label("bar", "").Obj(),
wantSuccess: true,
},
}
for _, tt := range tests {
if tt.popFromBackoffQEnabled == nil {
tt.popFromBackoffQEnabled = []bool{true, false}
}
for _, popFromBackoffQEnabled := range tt.popFromBackoffQEnabled {
t.Run(fmt.Sprintf("%s popFromBackoffQEnabled(%v)", tt.name, popFromBackoffQEnabled), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerPopFromBackoffQ, popFromBackoffQEnabled)
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
m := map[string][]framework.PreEnqueuePlugin{"": tt.plugins}
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m),
WithPodInitialBackoffDuration(time.Second*30), WithPodMaxBackoffDuration(time.Second*60))
pInfo := q.newQueuedPodInfo(tt.pod)
got := q.moveToBackoffQ(logger, pInfo, framework.EventUnscheduledPodAdd.Label())
if got != tt.wantSuccess {
t.Errorf("Unexpected result: want %v, but got %v", tt.wantSuccess, got)
}
if tt.wantSuccess {
if !q.backoffQ.has(pInfo) {
t.Errorf("Expected pod to be in backoffQ, but it isn't")
}
if q.unschedulablePods.get(pInfo.Pod) != nil {
t.Errorf("Expected pod not to be in unschedulablePods, but it is")
}
} else {
if q.backoffQ.has(pInfo) {
t.Errorf("Expected pod not to be in backoffQ, but it is")
}
if q.unschedulablePods.get(pInfo.Pod) == nil {
t.Errorf("Expected pod to be in unschedulablePods, but it isn't")
}
}
})
}
}
}

View File

@ -1149,6 +1149,12 @@
lockToDefault: false
preRelease: Alpha
version: "1.32"
- name: SchedulerPopFromBackoffQ
versionedSpecs:
- default: true
lockToDefault: false
preRelease: Beta
version: "1.33"
- name: SchedulerQueueingHints
versionedSpecs:
- default: false