mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 18:24:07 +00:00
skip update pod that exist in scheduling cycle
This commit is contained in:
parent
a81ea5d0af
commit
26dcab1146
@ -986,6 +986,25 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
|
|||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
defer p.lock.Unlock()
|
defer p.lock.Unlock()
|
||||||
|
|
||||||
|
if p.isSchedulingQueueHintEnabled {
|
||||||
|
// the inflight pod will be requeued using the latest version from the informer cache, which matches what the event delivers.
|
||||||
|
if _, ok := p.inFlightPods[newPod.UID]; ok {
|
||||||
|
logger.V(6).Info("The pod doesn't be queued for now because it's being scheduled and will be queued back if necessary", "pod", klog.KObj(newPod))
|
||||||
|
|
||||||
|
// Record this update as Pod/Update because
|
||||||
|
// this update may make the Pod schedulable in case it gets rejected and comes back to the queue.
|
||||||
|
// We can clean it up once we change updatePodInSchedulingQueue to call MoveAllToActiveOrBackoffQueue.
|
||||||
|
// See https://github.com/kubernetes/kubernetes/pull/125578#discussion_r1648338033 for more context.
|
||||||
|
p.inFlightEvents.PushBack(&clusterEvent{
|
||||||
|
event: UnscheduledPodUpdate,
|
||||||
|
oldObj: oldPod,
|
||||||
|
newObj: newPod,
|
||||||
|
})
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if oldPod != nil {
|
if oldPod != nil {
|
||||||
oldPodInfo := newQueuedPodInfoForLookup(oldPod)
|
oldPodInfo := newQueuedPodInfoForLookup(oldPod)
|
||||||
// If the pod is already in the active queue, just update it there.
|
// If the pod is already in the active queue, just update it there.
|
||||||
|
@ -993,6 +993,7 @@ func TestPriorityQueue_Update(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
notInAnyQueue := "NotInAnyQueue"
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
wantQ string
|
wantQ string
|
||||||
@ -1092,6 +1093,25 @@ func TestPriorityQueue_Update(t *testing.T) {
|
|||||||
},
|
},
|
||||||
schedulingHintsEnablement: []bool{true},
|
schedulingHintsEnablement: []bool{true},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "when updating a pod which is in flightPods, the pod will not be added to any queue",
|
||||||
|
wantQ: notInAnyQueue,
|
||||||
|
prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) {
|
||||||
|
podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod)
|
||||||
|
// We need to once add this Pod to activeQ and Pop() it so that this Pod is registered correctly in inFlightPods.
|
||||||
|
err := q.activeQ.Add(podInfo)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error from activeQ.Add: %v", err)
|
||||||
|
}
|
||||||
|
if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod {
|
||||||
|
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
|
||||||
|
}
|
||||||
|
updatedPod := medPriorityPodInfo.Pod.DeepCopy()
|
||||||
|
updatedPod.Annotations["foo"] = "bar"
|
||||||
|
return medPriorityPodInfo.Pod, updatedPod
|
||||||
|
},
|
||||||
|
schedulingHintsEnablement: []bool{true},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
@ -1134,6 +1154,11 @@ func TestPriorityQueue_Update(t *testing.T) {
|
|||||||
pInfo = pInfoFromUnsched
|
pInfo = pInfoFromUnsched
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if tt.wantQ == notInAnyQueue {
|
||||||
|
// skip the rest of the test if pod is not expected to be in any of the queues.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if diff := cmp.Diff(newPod, pInfo.PodInfo.Pod); diff != "" {
|
if diff := cmp.Diff(newPod, pInfo.PodInfo.Pod); diff != "" {
|
||||||
t.Errorf("Unexpected updated pod diff (-want, +got): %s", diff)
|
t.Errorf("Unexpected updated pod diff (-want, +got): %s", diff)
|
||||||
}
|
}
|
||||||
@ -1147,6 +1172,66 @@ func TestPriorityQueue_Update(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestPriorityQueue_UpdateWhenInflight ensures to requeue a Pod back to activeQ/backoffQ
|
||||||
|
// if it actually got an update that may make it schedulable while being scheduled.
|
||||||
|
// See https://github.com/kubernetes/kubernetes/pull/125578#discussion_r1648338033 for more context.
|
||||||
|
func TestPriorityQueue_UpdateWhenInflight(t *testing.T) {
|
||||||
|
logger, ctx := ktesting.NewTestContext(t)
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, true)
|
||||||
|
m := makeEmptyQueueingHintMapPerProfile()
|
||||||
|
// fakePlugin could change its scheduling result by any updates in Pods.
|
||||||
|
m[""][UnscheduledPodUpdate] = []*QueueingHintFunction{
|
||||||
|
{
|
||||||
|
PluginName: "fakePlugin",
|
||||||
|
QueueingHintFn: queueHintReturnQueue,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
c := testingclock.NewFakeClock(time.Now())
|
||||||
|
q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(c))
|
||||||
|
|
||||||
|
// test-pod is created and popped out from the queue
|
||||||
|
testPod := st.MakePod().Name("test-pod").Namespace("test-ns").UID("test-uid").Obj()
|
||||||
|
if err := q.Add(logger, testPod); err != nil {
|
||||||
|
t.Errorf("add failed: %v", err)
|
||||||
|
}
|
||||||
|
if p, err := q.Pop(logger); err != nil || p.Pod != testPod {
|
||||||
|
t.Errorf("Expected: %v after Pop, but got: %v", testPod.Name, p.Pod.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// testPod is updated while being scheduled.
|
||||||
|
updatedPod := testPod.DeepCopy()
|
||||||
|
updatedPod.Spec.Tolerations = []v1.Toleration{
|
||||||
|
{
|
||||||
|
Key: "foo",
|
||||||
|
Effect: v1.TaintEffectNoSchedule,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := q.Update(logger, testPod, updatedPod); err != nil {
|
||||||
|
t.Error("Error calling Update")
|
||||||
|
}
|
||||||
|
// test-pod got rejected by fakePlugin,
|
||||||
|
// but the update event that it just got may change this scheduling result,
|
||||||
|
// and hence we should put this pod to activeQ/backoffQ.
|
||||||
|
err := q.AddUnschedulableIfNotPresent(logger, newQueuedPodInfoForLookup(updatedPod, "fakePlugin"), q.SchedulingCycle())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var pInfo *framework.QueuedPodInfo
|
||||||
|
if obj, exists, _ := q.podBackoffQ.Get(newQueuedPodInfoForLookup(updatedPod)); !exists {
|
||||||
|
t.Fatalf("expected pod %s to be queued to backoffQ, but it wasn't.", updatedPod.Name)
|
||||||
|
} else {
|
||||||
|
pInfo = obj.(*framework.QueuedPodInfo)
|
||||||
|
}
|
||||||
|
if diff := cmp.Diff(updatedPod, pInfo.PodInfo.Pod); diff != "" {
|
||||||
|
t.Errorf("Unexpected updated pod diff (-want, +got): %s", diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestPriorityQueue_Delete(t *testing.T) {
|
func TestPriorityQueue_Delete(t *testing.T) {
|
||||||
objs := []runtime.Object{highPriorityPodInfo.Pod, unschedulablePodInfo.Pod}
|
objs := []runtime.Object{highPriorityPodInfo.Pod, unschedulablePodInfo.Pod}
|
||||||
logger, ctx := ktesting.NewTestContext(t)
|
logger, ctx := ktesting.NewTestContext(t)
|
||||||
|
Loading…
Reference in New Issue
Block a user