Merge pull request #125578 from nayihz/fix_sche_queue_update

skip update pod that exist in scheduling cycle
This commit is contained in:
Kubernetes Prow Robot 2024-06-25 14:18:19 -07:00 committed by GitHub
commit 59673f0f37
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 104 additions and 0 deletions

View File

@ -987,6 +987,25 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
p.lock.Lock()
defer p.lock.Unlock()
if p.isSchedulingQueueHintEnabled {
// the inflight pod will be requeued using the latest version from the informer cache, which matches what the event delivers.
if _, ok := p.inFlightPods[newPod.UID]; ok {
logger.V(6).Info("The pod doesn't be queued for now because it's being scheduled and will be queued back if necessary", "pod", klog.KObj(newPod))
// Record this update as Pod/Update because
// this update may make the Pod schedulable in case it gets rejected and comes back to the queue.
// We can clean it up once we change updatePodInSchedulingQueue to call MoveAllToActiveOrBackoffQueue.
// See https://github.com/kubernetes/kubernetes/pull/125578#discussion_r1648338033 for more context.
p.inFlightEvents.PushBack(&clusterEvent{
event: UnscheduledPodUpdate,
oldObj: oldPod,
newObj: newPod,
})
return nil
}
}
if oldPod != nil {
oldPodInfo := newQueuedPodInfoForLookup(oldPod)
// If the pod is already in the active queue, just update it there.

View File

@ -994,6 +994,7 @@ func TestPriorityQueue_Update(t *testing.T) {
},
}
notInAnyQueue := "NotInAnyQueue"
tests := []struct {
name string
wantQ string
@ -1093,6 +1094,25 @@ func TestPriorityQueue_Update(t *testing.T) {
},
schedulingHintsEnablement: []bool{true},
},
{
name: "when updating a pod which is in flightPods, the pod will not be added to any queue",
wantQ: notInAnyQueue,
prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) {
podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod)
// We need to once add this Pod to activeQ and Pop() it so that this Pod is registered correctly in inFlightPods.
err := q.activeQ.Add(podInfo)
if err != nil {
t.Errorf("unexpected error from activeQ.Add: %v", err)
}
if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
}
updatedPod := medPriorityPodInfo.Pod.DeepCopy()
updatedPod.Annotations["foo"] = "bar"
return medPriorityPodInfo.Pod, updatedPod
},
schedulingHintsEnablement: []bool{true},
},
}
for _, tt := range tests {
@ -1135,6 +1155,11 @@ func TestPriorityQueue_Update(t *testing.T) {
pInfo = pInfoFromUnsched
}
if tt.wantQ == notInAnyQueue {
// skip the rest of the test if pod is not expected to be in any of the queues.
return
}
if diff := cmp.Diff(newPod, pInfo.PodInfo.Pod); diff != "" {
t.Errorf("Unexpected updated pod diff (-want, +got): %s", diff)
}
@ -1148,6 +1173,66 @@ func TestPriorityQueue_Update(t *testing.T) {
}
}
// TestPriorityQueue_UpdateWhenInflight ensures to requeue a Pod back to activeQ/backoffQ
// if it actually got an update that may make it schedulable while being scheduled.
// See https://github.com/kubernetes/kubernetes/pull/125578#discussion_r1648338033 for more context.
func TestPriorityQueue_UpdateWhenInflight(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, true)
m := makeEmptyQueueingHintMapPerProfile()
// fakePlugin could change its scheduling result by any updates in Pods.
m[""][UnscheduledPodUpdate] = []*QueueingHintFunction{
{
PluginName: "fakePlugin",
QueueingHintFn: queueHintReturnQueue,
},
}
c := testingclock.NewFakeClock(time.Now())
q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(c))
// test-pod is created and popped out from the queue
testPod := st.MakePod().Name("test-pod").Namespace("test-ns").UID("test-uid").Obj()
if err := q.Add(logger, testPod); err != nil {
t.Errorf("add failed: %v", err)
}
if p, err := q.Pop(logger); err != nil || p.Pod != testPod {
t.Errorf("Expected: %v after Pop, but got: %v", testPod.Name, p.Pod.Name)
}
// testPod is updated while being scheduled.
updatedPod := testPod.DeepCopy()
updatedPod.Spec.Tolerations = []v1.Toleration{
{
Key: "foo",
Effect: v1.TaintEffectNoSchedule,
},
}
if err := q.Update(logger, testPod, updatedPod); err != nil {
t.Error("Error calling Update")
}
// test-pod got rejected by fakePlugin,
// but the update event that it just got may change this scheduling result,
// and hence we should put this pod to activeQ/backoffQ.
err := q.AddUnschedulableIfNotPresent(logger, newQueuedPodInfoForLookup(updatedPod, "fakePlugin"), q.SchedulingCycle())
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
var pInfo *framework.QueuedPodInfo
if obj, exists, _ := q.podBackoffQ.Get(newQueuedPodInfoForLookup(updatedPod)); !exists {
t.Fatalf("expected pod %s to be queued to backoffQ, but it wasn't.", updatedPod.Name)
} else {
pInfo = obj.(*framework.QueuedPodInfo)
}
if diff := cmp.Diff(updatedPod, pInfo.PodInfo.Pod); diff != "" {
t.Errorf("Unexpected updated pod diff (-want, +got): %s", diff)
}
}
func TestPriorityQueue_Delete(t *testing.T) {
objs := []runtime.Object{highPriorityPodInfo.Pod, unschedulablePodInfo.Pod}
logger, ctx := ktesting.NewTestContext(t)