From 26dcab114670bc5ecf2003b59b8a0bcc756eb45f Mon Sep 17 00:00:00 2001 From: nayihz Date: Wed, 19 Jun 2024 11:25:32 +0800 Subject: [PATCH] skip update pod that exist in scheduling cycle --- .../internal/queue/scheduling_queue.go | 19 +++++ .../internal/queue/scheduling_queue_test.go | 85 +++++++++++++++++++ 2 files changed, 104 insertions(+) diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 292e72916f6..7aa52f8b3a7 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -986,6 +986,25 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error p.lock.Lock() defer p.lock.Unlock() + if p.isSchedulingQueueHintEnabled { + // the inflight pod will be requeued using the latest version from the informer cache, which matches what the event delivers. + if _, ok := p.inFlightPods[newPod.UID]; ok { + logger.V(6).Info("The pod doesn't be queued for now because it's being scheduled and will be queued back if necessary", "pod", klog.KObj(newPod)) + + // Record this update as Pod/Update because + // this update may make the Pod schedulable in case it gets rejected and comes back to the queue. + // We can clean it up once we change updatePodInSchedulingQueue to call MoveAllToActiveOrBackoffQueue. + // See https://github.com/kubernetes/kubernetes/pull/125578#discussion_r1648338033 for more context. + p.inFlightEvents.PushBack(&clusterEvent{ + event: UnscheduledPodUpdate, + oldObj: oldPod, + newObj: newPod, + }) + + return nil + } + } + if oldPod != nil { oldPodInfo := newQueuedPodInfoForLookup(oldPod) // If the pod is already in the active queue, just update it there. diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index c2ad3d8ce67..134db25a5b6 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -993,6 +993,7 @@ func TestPriorityQueue_Update(t *testing.T) { }, } + notInAnyQueue := "NotInAnyQueue" tests := []struct { name string wantQ string @@ -1092,6 +1093,25 @@ func TestPriorityQueue_Update(t *testing.T) { }, schedulingHintsEnablement: []bool{true}, }, + { + name: "when updating a pod which is in flightPods, the pod will not be added to any queue", + wantQ: notInAnyQueue, + prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { + podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod) + // We need to once add this Pod to activeQ and Pop() it so that this Pod is registered correctly in inFlightPods. + err := q.activeQ.Add(podInfo) + if err != nil { + t.Errorf("unexpected error from activeQ.Add: %v", err) + } + if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) + } + updatedPod := medPriorityPodInfo.Pod.DeepCopy() + updatedPod.Annotations["foo"] = "bar" + return medPriorityPodInfo.Pod, updatedPod + }, + schedulingHintsEnablement: []bool{true}, + }, } for _, tt := range tests { @@ -1134,6 +1154,11 @@ func TestPriorityQueue_Update(t *testing.T) { pInfo = pInfoFromUnsched } + if tt.wantQ == notInAnyQueue { + // skip the rest of the test if pod is not expected to be in any of the queues. + return + } + if diff := cmp.Diff(newPod, pInfo.PodInfo.Pod); diff != "" { t.Errorf("Unexpected updated pod diff (-want, +got): %s", diff) } @@ -1147,6 +1172,66 @@ func TestPriorityQueue_Update(t *testing.T) { } } +// TestPriorityQueue_UpdateWhenInflight ensures to requeue a Pod back to activeQ/backoffQ +// if it actually got an update that may make it schedulable while being scheduled. +// See https://github.com/kubernetes/kubernetes/pull/125578#discussion_r1648338033 for more context. +func TestPriorityQueue_UpdateWhenInflight(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, true) + m := makeEmptyQueueingHintMapPerProfile() + // fakePlugin could change its scheduling result by any updates in Pods. + m[""][UnscheduledPodUpdate] = []*QueueingHintFunction{ + { + PluginName: "fakePlugin", + QueueingHintFn: queueHintReturnQueue, + }, + } + c := testingclock.NewFakeClock(time.Now()) + q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(c)) + + // test-pod is created and popped out from the queue + testPod := st.MakePod().Name("test-pod").Namespace("test-ns").UID("test-uid").Obj() + if err := q.Add(logger, testPod); err != nil { + t.Errorf("add failed: %v", err) + } + if p, err := q.Pop(logger); err != nil || p.Pod != testPod { + t.Errorf("Expected: %v after Pop, but got: %v", testPod.Name, p.Pod.Name) + } + + // testPod is updated while being scheduled. + updatedPod := testPod.DeepCopy() + updatedPod.Spec.Tolerations = []v1.Toleration{ + { + Key: "foo", + Effect: v1.TaintEffectNoSchedule, + }, + } + + if err := q.Update(logger, testPod, updatedPod); err != nil { + t.Error("Error calling Update") + } + // test-pod got rejected by fakePlugin, + // but the update event that it just got may change this scheduling result, + // and hence we should put this pod to activeQ/backoffQ. + err := q.AddUnschedulableIfNotPresent(logger, newQueuedPodInfoForLookup(updatedPod, "fakePlugin"), q.SchedulingCycle()) + if err != nil { + t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) + } + + var pInfo *framework.QueuedPodInfo + if obj, exists, _ := q.podBackoffQ.Get(newQueuedPodInfoForLookup(updatedPod)); !exists { + t.Fatalf("expected pod %s to be queued to backoffQ, but it wasn't.", updatedPod.Name) + } else { + pInfo = obj.(*framework.QueuedPodInfo) + } + if diff := cmp.Diff(updatedPod, pInfo.PodInfo.Pod); diff != "" { + t.Errorf("Unexpected updated pod diff (-want, +got): %s", diff) + } +} + func TestPriorityQueue_Delete(t *testing.T) { objs := []runtime.Object{highPriorityPodInfo.Pod, unschedulablePodInfo.Pod} logger, ctx := ktesting.NewTestContext(t)