From c01fa8279d3af2896744b2d6e011786d2bd62e68 Mon Sep 17 00:00:00 2001 From: lianghao208 Date: Tue, 7 Feb 2023 21:37:50 +0800 Subject: [PATCH] Optimization on running prePreEnqueuePlugins before adding pods into activeQ --- .../internal/queue/scheduling_queue.go | 4 +- .../internal/queue/scheduling_queue_test.go | 44 +++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 2486792078e..e265878e6a0 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -521,7 +521,9 @@ func (p *PriorityQueue) flushBackoffQCompleted() { klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod)) break } - if added, _ := p.addToActiveQ(pInfo); added { + if err := p.activeQ.Add(pInfo); err != nil { + klog.ErrorS(err, "Error adding pod to the active queue", "pod", klog.KObj(pInfo.Pod)) + } else { klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", BackoffComplete, "queue", activeQName) metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc() activated = true diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index cd819118ac2..5327ac45064 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -447,6 +447,8 @@ func TestPriorityQueue_Activate(t *testing.T) { } type preEnqueuePlugin struct { + // called counts the number of calling PreEnqueue() + called int allowlists []string } @@ -455,6 +457,7 @@ func (pl *preEnqueuePlugin) Name() string { } func (pl *preEnqueuePlugin) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status { + pl.called++ for _, allowed := range pl.allowlists { for label := range p.Labels { if label == allowed { @@ -536,6 +539,47 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) { } } +func TestPriorityQueue_flushBackoffQCompleted(t *testing.T) { + tests := []struct { + name string + plugin framework.PreEnqueuePlugin + pod *v1.Pod + operations []operation + wantPreEnqueuePluginCalled int + }{ + { + name: "preEnqueue plugin registered, not running preEnqueue plugin when backoff completed", + plugin: &preEnqueuePlugin{}, + pod: st.MakePod().Name("foo").Label("foo", "").Obj(), + operations: []operation{ + addPodBackoffQ, + flushBackoffQ, + }, + wantPreEnqueuePluginCalled: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + m := map[string][]framework.PreEnqueuePlugin{"": {tt.plugin}} + c := testingclock.NewFakeClock(time.Now()) + q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m), + WithPodInitialBackoffDuration(time.Second*1), WithPodMaxBackoffDuration(time.Second*60), WithClock(c)) + pInfo := newQueuedPodInfoForLookup(tt.pod) + pInfo.Gated = true + for _, op := range tt.operations { + op(q, pInfo) + } + if tt.wantPreEnqueuePluginCalled != tt.plugin.(*preEnqueuePlugin).called { + t.Errorf("Unexpected number of calling preEnqueue: want %v, but got %v", tt.wantPreEnqueuePluginCalled, tt.plugin.(*preEnqueuePlugin).called) + } + }) + } +} + func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) { tests := []struct { name string