Call PreEnqueue plugins before adding pod to backoffQ

This commit is contained in:
Maciej Skoczeń 2025-02-28 12:18:37 +00:00
parent 8d4eaa024d
commit 9df0f6b604
2 changed files with 208 additions and 42 deletions

View File

@ -189,6 +189,8 @@ type PriorityQueue struct {
// isSchedulingQueueHintEnabled indicates whether the feature gate for the scheduling queue is enabled. // isSchedulingQueueHintEnabled indicates whether the feature gate for the scheduling queue is enabled.
isSchedulingQueueHintEnabled bool isSchedulingQueueHintEnabled bool
// isPopFromBackoffQEnabled indicates whether the feature gate SchedulerPopFromBackoffQ is enabled.
isPopFromBackoffQEnabled bool
} }
// QueueingHintFunction is the wrapper of QueueingHintFn that has PluginName. // QueueingHintFunction is the wrapper of QueueingHintFn that has PluginName.
@ -325,6 +327,7 @@ func NewPriorityQueue(
} }
isSchedulingQueueHintEnabled := utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) isSchedulingQueueHintEnabled := utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints)
isPopFromBackoffQEnabled := utilfeature.DefaultFeatureGate.Enabled(features.SchedulerPopFromBackoffQ)
pq := &PriorityQueue{ pq := &PriorityQueue{
clock: options.clock, clock: options.clock,
@ -339,6 +342,7 @@ func NewPriorityQueue(
pluginMetricsSamplePercent: options.pluginMetricsSamplePercent, pluginMetricsSamplePercent: options.pluginMetricsSamplePercent,
moveRequestCycle: -1, moveRequestCycle: -1,
isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled, isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled,
isPopFromBackoffQEnabled: isPopFromBackoffQEnabled,
} }
pq.nsLister = informerFactory.Core().V1().Namespaces().Lister() pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()
pq.nominator = newPodNominator(options.podLister) pq.nominator = newPodNominator(options.podLister)
@ -545,13 +549,17 @@ func (p *PriorityQueue) runPreEnqueuePlugin(ctx context.Context, pl framework.Pr
return s return s
} }
// moveToActiveQ tries to add pod to active queue and remove it from unschedulable and backoff queues. // moveToActiveQ tries to add the pod to the active queue.
// It returns 2 parameters: // If the pod doesn't pass PreEnqueue plugins, it gets added to unschedulablePods instead.
// 1. a boolean flag to indicate whether the pod is added successfully. // It returns a boolean flag to indicate whether the pod is added successfully.
// 2. an error for the caller to act on.
func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) bool { func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) bool {
gatedBefore := pInfo.Gated gatedBefore := pInfo.Gated
pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo) // If SchedulerPopFromBackoffQ feature gate is enabled,
// PreEnqueue plugins were called when the pod was added to the backoffQ.
// Don't need to repeat it here when the pod is directly moved from the backoffQ.
if !p.isPopFromBackoffQEnabled || event != framework.BackoffComplete {
pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo)
}
added := false added := false
p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
@ -588,6 +596,28 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue
return added return added
} }
// moveToBackoffQ tries to add the pod to the backoff queue.
// If SchedulerPopFromBackoffQ feature gate is enabled and the pod doesn't pass PreEnqueue plugins, it gets added to unschedulablePods instead.
// It returns a boolean flag to indicate whether the pod is added successfully.
func (p *PriorityQueue) moveToBackoffQ(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) bool {
// If SchedulerPopFromBackoffQ feature gate is enabled,
// PreEnqueue plugins are called on inserting pods to the backoffQ,
// not to call them again on popping out.
if p.isPopFromBackoffQEnabled {
pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo)
if pInfo.Gated {
if p.unschedulablePods.get(pInfo.Pod) == nil {
p.unschedulablePods.addOrUpdate(pInfo, event)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", unschedulablePods)
}
return false
}
}
p.backoffQ.add(logger, pInfo, event)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQ)
return true
}
// Add adds a pod to the active queue. It should be called only when a new pod // Add adds a pod to the active queue. It should be called only when a new pod
// is added so there is no chance the pod is already in active/unschedulable/backoff queues // is added so there is no chance the pod is already in active/unschedulable/backoff queues
func (p *PriorityQueue) Add(logger klog.Logger, pod *v1.Pod) { func (p *PriorityQueue) Add(logger klog.Logger, pod *v1.Pod) {
@ -724,8 +754,7 @@ func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger,
// - No unschedulable plugins are associated with this Pod, // - No unschedulable plugins are associated with this Pod,
// meaning something unusual (a temporal failure on kube-apiserver, etc) happened and this Pod gets moved back to the queue. // meaning something unusual (a temporal failure on kube-apiserver, etc) happened and this Pod gets moved back to the queue.
// In this case, we should retry scheduling it because this Pod may not be retried until the next flush. // In this case, we should retry scheduling it because this Pod may not be retried until the next flush.
p.backoffQ.add(logger, pInfo, framework.ScheduleAttemptFailure) _ = p.moveToBackoffQ(logger, pInfo, framework.ScheduleAttemptFailure)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", backoffQ)
} else { } else {
p.unschedulablePods.addOrUpdate(pInfo, framework.ScheduleAttemptFailure) p.unschedulablePods.addOrUpdate(pInfo, framework.ScheduleAttemptFailure)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", unschedulablePods) logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", unschedulablePods)
@ -934,13 +963,13 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
// Pod might have completed its backoff time while being in unschedulablePods, // Pod might have completed its backoff time while being in unschedulablePods,
// so we should check isPodBackingoff before moving the pod to backoffQ. // so we should check isPodBackingoff before moving the pod to backoffQ.
if p.backoffQ.isPodBackingoff(pInfo) { if p.backoffQ.isPodBackingoff(pInfo) {
p.backoffQ.add(logger, pInfo, framework.EventUnscheduledPodUpdate.Label()) if added := p.moveToBackoffQ(logger, pInfo, framework.EventUnscheduledPodUpdate.Label()); added {
p.unschedulablePods.delete(pInfo.Pod, gated) p.unschedulablePods.delete(pInfo.Pod, gated)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", framework.EventUnscheduledPodUpdate.Label(), "queue", backoffQ) }
return return
} }
if added := p.moveToActiveQ(logger, pInfo, framework.BackoffComplete); added { if added := p.moveToActiveQ(logger, pInfo, framework.EventUnscheduledPodUpdate.Label()); added {
p.activeQ.broadcast() p.activeQ.broadcast()
} }
return return
@ -1044,8 +1073,10 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra
// Pod might have completed its backoff time while being in unschedulablePods, // Pod might have completed its backoff time while being in unschedulablePods,
// so we should check isPodBackingoff before moving the pod to backoffQ. // so we should check isPodBackingoff before moving the pod to backoffQ.
if strategy == queueAfterBackoff && p.backoffQ.isPodBackingoff(pInfo) { if strategy == queueAfterBackoff && p.backoffQ.isPodBackingoff(pInfo) {
p.backoffQ.add(logger, pInfo, event) if added := p.moveToBackoffQ(logger, pInfo, event); added {
return backoffQ return backoffQ
}
return unschedulablePods
} }
// Reach here if schedulingHint is QueueImmediately, or schedulingHint is Queue but the pod is not backing off. // Reach here if schedulingHint is QueueImmediately, or schedulingHint is Queue but the pod is not backing off.

View File

@ -1443,17 +1443,20 @@ func (pl *preEnqueuePlugin) PreEnqueue(ctx context.Context, p *v1.Pod) *framewor
return framework.NewStatus(framework.UnschedulableAndUnresolvable, "pod name not in allowlists") return framework.NewStatus(framework.UnschedulableAndUnresolvable, "pod name not in allowlists")
} }
func TestPriorityQueue_addToActiveQ(t *testing.T) { func TestPriorityQueue_moveToActiveQ(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
plugins []framework.PreEnqueuePlugin plugins []framework.PreEnqueuePlugin
pod *v1.Pod pod *v1.Pod
wantUnschedulablePods int event string
wantSuccess bool popFromBackoffQEnabled []bool
wantUnschedulablePods int
wantSuccess bool
}{ }{
{ {
name: "no plugins registered", name: "no plugins registered",
pod: st.MakePod().Name("p").Label("p", "").Obj(), pod: st.MakePod().Name("p").Label("p", "").Obj(),
event: framework.EventUnscheduledPodAdd.Label(),
wantUnschedulablePods: 0, wantUnschedulablePods: 0,
wantSuccess: true, wantSuccess: true,
}, },
@ -1461,6 +1464,7 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) {
name: "preEnqueue plugin registered, pod name not in allowlists", name: "preEnqueue plugin registered, pod name not in allowlists",
plugins: []framework.PreEnqueuePlugin{&preEnqueuePlugin{}, &preEnqueuePlugin{}}, plugins: []framework.PreEnqueuePlugin{&preEnqueuePlugin{}, &preEnqueuePlugin{}},
pod: st.MakePod().Name("p").Label("p", "").Obj(), pod: st.MakePod().Name("p").Label("p", "").Obj(),
event: framework.EventUnscheduledPodAdd.Label(),
wantUnschedulablePods: 1, wantUnschedulablePods: 1,
wantSuccess: false, wantSuccess: false,
}, },
@ -1471,9 +1475,36 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) {
&preEnqueuePlugin{allowlists: []string{"foo"}}, &preEnqueuePlugin{allowlists: []string{"foo"}},
}, },
pod: st.MakePod().Name("bar").Label("bar", "").Obj(), pod: st.MakePod().Name("bar").Label("bar", "").Obj(),
event: framework.EventUnscheduledPodAdd.Label(),
wantUnschedulablePods: 1, wantUnschedulablePods: 1,
wantSuccess: false, wantSuccess: false,
}, },
{
// With SchedulerPopFromBackoffQ enabled, the queue assumes the pod has already passed PreEnqueue,
// and it doesn't run PreEnqueue again, always puts the pod to activeQ.
name: "preEnqueue plugin registered, preEnqueue plugin would reject the pod, but isn't run",
plugins: []framework.PreEnqueuePlugin{
&preEnqueuePlugin{allowlists: []string{"foo", "bar"}},
&preEnqueuePlugin{allowlists: []string{"foo"}},
},
pod: st.MakePod().Name("bar").Label("bar", "").Obj(),
event: framework.BackoffComplete,
popFromBackoffQEnabled: []bool{false},
wantUnschedulablePods: 1,
wantSuccess: false,
},
{
name: "preEnqueue plugin registered, pod would fail one preEnqueue plugin, but is after backoff",
plugins: []framework.PreEnqueuePlugin{
&preEnqueuePlugin{allowlists: []string{"foo", "bar"}},
&preEnqueuePlugin{allowlists: []string{"foo"}},
},
pod: st.MakePod().Name("bar").Label("bar", "").Obj(),
event: framework.BackoffComplete,
popFromBackoffQEnabled: []bool{true},
wantUnschedulablePods: 0,
wantSuccess: true,
},
{ {
name: "preEnqueue plugin registered, pod passed all preEnqueue plugins", name: "preEnqueue plugin registered, pod passed all preEnqueue plugins",
plugins: []framework.PreEnqueuePlugin{ plugins: []framework.PreEnqueuePlugin{
@ -1481,37 +1512,141 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) {
&preEnqueuePlugin{allowlists: []string{"bar"}}, &preEnqueuePlugin{allowlists: []string{"bar"}},
}, },
pod: st.MakePod().Name("bar").Label("bar", "").Obj(), pod: st.MakePod().Name("bar").Label("bar", "").Obj(),
event: framework.EventUnscheduledPodAdd.Label(),
wantUnschedulablePods: 0, wantUnschedulablePods: 0,
wantSuccess: true, wantSuccess: true,
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { if tt.popFromBackoffQEnabled == nil {
logger, ctx := ktesting.NewTestContext(t) tt.popFromBackoffQEnabled = []bool{true, false}
ctx, cancel := context.WithCancel(ctx) }
defer cancel() for _, popFromBackoffQEnabled := range tt.popFromBackoffQEnabled {
t.Run(fmt.Sprintf("%s popFromBackoffQEnabled(%v)", tt.name, popFromBackoffQEnabled), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerPopFromBackoffQ, popFromBackoffQEnabled)
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
m := map[string][]framework.PreEnqueuePlugin{"": tt.plugins} m := map[string][]framework.PreEnqueuePlugin{"": tt.plugins}
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m), q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m),
WithPodInitialBackoffDuration(time.Second*30), WithPodMaxBackoffDuration(time.Second*60)) WithPodInitialBackoffDuration(time.Second*30), WithPodMaxBackoffDuration(time.Second*60))
got := q.moveToActiveQ(logger, q.newQueuedPodInfo(tt.pod), framework.EventUnscheduledPodAdd.Label()) got := q.moveToActiveQ(logger, q.newQueuedPodInfo(tt.pod), tt.event)
if got != tt.wantSuccess { if got != tt.wantSuccess {
t.Errorf("Unexpected result: want %v, but got %v", tt.wantSuccess, got) t.Errorf("Unexpected result: want %v, but got %v", tt.wantSuccess, got)
} }
if tt.wantUnschedulablePods != len(q.unschedulablePods.podInfoMap) { if tt.wantUnschedulablePods != len(q.unschedulablePods.podInfoMap) {
t.Errorf("Unexpected unschedulablePods: want %v, but got %v", tt.wantUnschedulablePods, len(q.unschedulablePods.podInfoMap)) t.Errorf("Unexpected unschedulablePods: want %v, but got %v", tt.wantUnschedulablePods, len(q.unschedulablePods.podInfoMap))
} }
// Simulate an update event. // Simulate an update event.
clone := tt.pod.DeepCopy() clone := tt.pod.DeepCopy()
metav1.SetMetaDataAnnotation(&clone.ObjectMeta, "foo", "") metav1.SetMetaDataAnnotation(&clone.ObjectMeta, "foo", "")
q.Update(logger, tt.pod, clone) q.Update(logger, tt.pod, clone)
// Ensure the pod is still located in unschedulablePods. // Ensure the pod is still located in unschedulablePods.
if tt.wantUnschedulablePods != len(q.unschedulablePods.podInfoMap) { if tt.wantUnschedulablePods != len(q.unschedulablePods.podInfoMap) {
t.Errorf("Unexpected unschedulablePods: want %v, but got %v", tt.wantUnschedulablePods, len(q.unschedulablePods.podInfoMap)) t.Errorf("Unexpected unschedulablePods: want %v, but got %v", tt.wantUnschedulablePods, len(q.unschedulablePods.podInfoMap))
} }
}) })
}
}
}
func TestPriorityQueue_moveToBackoffQ(t *testing.T) {
tests := []struct {
name string
plugins []framework.PreEnqueuePlugin
pod *v1.Pod
popFromBackoffQEnabled []bool
wantSuccess bool
}{
{
name: "no plugins registered",
pod: st.MakePod().Name("p").Label("p", "").Obj(),
wantSuccess: true,
},
{
name: "preEnqueue plugin registered, pod name would not be in allowlists",
plugins: []framework.PreEnqueuePlugin{&preEnqueuePlugin{}, &preEnqueuePlugin{}},
pod: st.MakePod().Name("p").Label("p", "").Obj(),
popFromBackoffQEnabled: []bool{false},
wantSuccess: true,
},
{
name: "preEnqueue plugin registered, pod name not in allowlists",
plugins: []framework.PreEnqueuePlugin{&preEnqueuePlugin{}, &preEnqueuePlugin{}},
pod: st.MakePod().Name("p").Label("p", "").Obj(),
popFromBackoffQEnabled: []bool{true},
wantSuccess: false,
},
{
name: "preEnqueue plugin registered, preEnqueue plugin would reject the pod, but isn't run",
plugins: []framework.PreEnqueuePlugin{
&preEnqueuePlugin{allowlists: []string{"foo", "bar"}},
&preEnqueuePlugin{allowlists: []string{"foo"}},
},
pod: st.MakePod().Name("bar").Label("bar", "").Obj(),
popFromBackoffQEnabled: []bool{false},
wantSuccess: true,
},
{
name: "preEnqueue plugin registered, pod failed one preEnqueue plugin",
plugins: []framework.PreEnqueuePlugin{
&preEnqueuePlugin{allowlists: []string{"foo", "bar"}},
&preEnqueuePlugin{allowlists: []string{"foo"}},
},
pod: st.MakePod().Name("bar").Label("bar", "").Obj(),
popFromBackoffQEnabled: []bool{true},
wantSuccess: false,
},
{
name: "preEnqueue plugin registered, pod passed all preEnqueue plugins",
plugins: []framework.PreEnqueuePlugin{
&preEnqueuePlugin{allowlists: []string{"foo", "bar"}},
&preEnqueuePlugin{allowlists: []string{"bar"}},
},
pod: st.MakePod().Name("bar").Label("bar", "").Obj(),
wantSuccess: true,
},
}
for _, tt := range tests {
if tt.popFromBackoffQEnabled == nil {
tt.popFromBackoffQEnabled = []bool{true, false}
}
for _, popFromBackoffQEnabled := range tt.popFromBackoffQEnabled {
t.Run(fmt.Sprintf("%s popFromBackoffQEnabled(%v)", tt.name, popFromBackoffQEnabled), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerPopFromBackoffQ, popFromBackoffQEnabled)
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
m := map[string][]framework.PreEnqueuePlugin{"": tt.plugins}
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m),
WithPodInitialBackoffDuration(time.Second*30), WithPodMaxBackoffDuration(time.Second*60))
pInfo := q.newQueuedPodInfo(tt.pod)
got := q.moveToBackoffQ(logger, pInfo, framework.EventUnscheduledPodAdd.Label())
if got != tt.wantSuccess {
t.Errorf("Unexpected result: want %v, but got %v", tt.wantSuccess, got)
}
if tt.wantSuccess {
if !q.backoffQ.has(pInfo) {
t.Errorf("Expected pod to be in backoffQ, but it isn't")
}
if q.unschedulablePods.get(pInfo.Pod) != nil {
t.Errorf("Expected pod not to be in unschedulablePods, but it is")
}
} else {
if q.backoffQ.has(pInfo) {
t.Errorf("Expected pod not to be in backoffQ, but it is")
}
if q.unschedulablePods.get(pInfo.Pod) == nil {
t.Errorf("Expected pod to be in unschedulablePods, but it isn't")
}
}
})
}
} }
} }