From a4e4a9943525de2f52fb274e7c6fd442515885d5 Mon Sep 17 00:00:00 2001 From: notpad Date: Fri, 14 Feb 2020 08:31:07 +0800 Subject: [PATCH] Remove PodBackoffMap --- pkg/scheduler/internal/queue/BUILD | 6 +- pkg/scheduler/internal/queue/pod_backoff.go | 119 -------------- .../internal/queue/pod_backoff_test.go | 99 ------------ .../internal/queue/scheduling_queue.go | 91 +++++------ .../internal/queue/scheduling_queue_test.go | 150 +++++++++--------- 5 files changed, 120 insertions(+), 345 deletions(-) delete mode 100644 pkg/scheduler/internal/queue/pod_backoff.go delete mode 100644 pkg/scheduler/internal/queue/pod_backoff_test.go diff --git a/pkg/scheduler/internal/queue/BUILD b/pkg/scheduler/internal/queue/BUILD index 5722266c2ff..392390e1795 100644 --- a/pkg/scheduler/internal/queue/BUILD +++ b/pkg/scheduler/internal/queue/BUILD @@ -4,7 +4,6 @@ go_library( name = "go_default_library", srcs = [ "events.go", - "pod_backoff.go", "scheduling_queue.go", ], importpath = "k8s.io/kubernetes/pkg/scheduler/internal/queue", @@ -25,10 +24,7 @@ go_library( go_test( name = "go_default_test", - srcs = [ - "pod_backoff_test.go", - "scheduling_queue_test.go", - ], + srcs = ["scheduling_queue_test.go"], embed = [":go_default_library"], deps = [ "//pkg/api/v1/pod:go_default_library", diff --git a/pkg/scheduler/internal/queue/pod_backoff.go b/pkg/scheduler/internal/queue/pod_backoff.go deleted file mode 100644 index 06c857ffa4e..00000000000 --- a/pkg/scheduler/internal/queue/pod_backoff.go +++ /dev/null @@ -1,119 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package queue - -import ( - "sync" - "time" - - ktypes "k8s.io/apimachinery/pkg/types" - "k8s.io/kubernetes/pkg/scheduler/util" -) - -// PodBackoffMap is a structure that stores backoff related information for pods -type PodBackoffMap struct { - // lock for performing actions on this PodBackoffMap - lock sync.RWMutex - clock util.Clock - // initial backoff duration - initialDuration time.Duration - // maximal backoff duration - maxDuration time.Duration - // map for pod -> number of attempts for this pod - podAttempts map[ktypes.NamespacedName]int - // map for pod -> lastUpdateTime pod of this pod - podLastUpdateTime map[ktypes.NamespacedName]time.Time -} - -// NewPodBackoffMap creates a PodBackoffMap with initial duration and max duration. -func NewPodBackoffMap(initialDuration, maxDuration time.Duration, clock util.Clock) *PodBackoffMap { - return &PodBackoffMap{ - clock: clock, - initialDuration: initialDuration, - maxDuration: maxDuration, - podAttempts: make(map[ktypes.NamespacedName]int), - podLastUpdateTime: make(map[ktypes.NamespacedName]time.Time), - } -} - -// GetBackoffTime returns the time that nsPod completes backoff -func (pbm *PodBackoffMap) GetBackoffTime(nsPod ktypes.NamespacedName) (time.Time, bool) { - pbm.lock.RLock() - defer pbm.lock.RUnlock() - if _, found := pbm.podAttempts[nsPod]; found == false { - return time.Time{}, false - } - lastUpdateTime := pbm.podLastUpdateTime[nsPod] - backoffDuration := pbm.calculateBackoffDuration(nsPod) - backoffTime := lastUpdateTime.Add(backoffDuration) - return backoffTime, true -} - -// calculateBackoffDuration is a helper function for calculating the backoffDuration -// based on the number of attempts the pod has made. -func (pbm *PodBackoffMap) calculateBackoffDuration(nsPod ktypes.NamespacedName) time.Duration { - backoffDuration := pbm.initialDuration - if _, found := pbm.podAttempts[nsPod]; found { - for i := 1; i < pbm.podAttempts[nsPod]; i++ { - backoffDuration = backoffDuration * 2 - if backoffDuration > pbm.maxDuration { - return pbm.maxDuration - } - } - } - return backoffDuration -} - -// clearPodBackoff removes all tracking information for nsPod. -// Lock is supposed to be acquired by caller. -func (pbm *PodBackoffMap) clearPodBackoff(nsPod ktypes.NamespacedName) { - delete(pbm.podAttempts, nsPod) - delete(pbm.podLastUpdateTime, nsPod) -} - -// ClearPodBackoff is the thread safe version of clearPodBackoff -func (pbm *PodBackoffMap) ClearPodBackoff(nsPod ktypes.NamespacedName) { - pbm.lock.Lock() - pbm.clearPodBackoff(nsPod) - pbm.lock.Unlock() -} - -// CleanupPodsCompletesBackingoff execute garbage collection on the pod backoff, -// i.e, it will remove a pod from the PodBackoffMap if -// lastUpdateTime + maxDuration >> timestamp -// We should wait longer than the maxDuration so that the pod gets a chance to -// (1) move to the active queue and (2) get an schedule attempt. -func (pbm *PodBackoffMap) CleanupPodsCompletesBackingoff() { - pbm.lock.Lock() - defer pbm.lock.Unlock() - for pod, value := range pbm.podLastUpdateTime { - // Here we assume that maxDuration should be enough for a pod to move up the - // active queue and get an schedule attempt. - if value.Add(2 * pbm.maxDuration).Before(pbm.clock.Now()) { - pbm.clearPodBackoff(pod) - } - } -} - -// BackoffPod updates the lastUpdateTime for an nsPod, -// and increases its numberOfAttempts by 1 -func (pbm *PodBackoffMap) BackoffPod(nsPod ktypes.NamespacedName) { - pbm.lock.Lock() - pbm.podLastUpdateTime[nsPod] = pbm.clock.Now() - pbm.podAttempts[nsPod]++ - pbm.lock.Unlock() -} diff --git a/pkg/scheduler/internal/queue/pod_backoff_test.go b/pkg/scheduler/internal/queue/pod_backoff_test.go deleted file mode 100644 index 6525cc335c2..00000000000 --- a/pkg/scheduler/internal/queue/pod_backoff_test.go +++ /dev/null @@ -1,99 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package queue - -import ( - "fmt" - "testing" - "time" - - ktypes "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/clock" -) - -func TestBackoffPod(t *testing.T) { - timestamp := time.Now() - bpm := NewPodBackoffMap(1*time.Second, 10*time.Second, clock.NewFakeClock(timestamp)) - tests := []struct { - podID ktypes.NamespacedName - expectedDuration time.Duration - advanceClock time.Duration - }{ - { - podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"}, - expectedDuration: 1 * time.Second, - }, - { - podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"}, - expectedDuration: 2 * time.Second, - }, - { - podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"}, - expectedDuration: 4 * time.Second, - }, - { - podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"}, - expectedDuration: 8 * time.Second, - }, - { - podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"}, - expectedDuration: 10 * time.Second, - }, - { - podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"}, - expectedDuration: 10 * time.Second, - }, - { - podID: ktypes.NamespacedName{Namespace: "default", Name: "bar"}, - expectedDuration: 1 * time.Second, - }, - } - - for i, test := range tests { - t.Run(fmt.Sprintf("step %d", i), func(t *testing.T) { - bpm.BackoffPod(test.podID) - backoff, ok := bpm.GetBackoffTime(test.podID) - if !ok { - t.Errorf("%v should be backed off", test.podID) - } - duration := backoff.Sub(timestamp) - if duration != test.expectedDuration { - t.Errorf("expected: %s, got %s for pod %s", test.expectedDuration.String(), duration.String(), test.podID) - } - }) - } -} - -func TestClearPodBackoff(t *testing.T) { - bpm := NewPodBackoffMap(1*time.Second, 60*time.Second, clock.NewFakeClock(time.Now())) - // Clear backoff on an not existed pod - bpm.clearPodBackoff(ktypes.NamespacedName{Namespace: "ns", Name: "not-existed"}) - // Backoff twice for pod foo - podID := ktypes.NamespacedName{Namespace: "ns", Name: "foo"} - bpm.BackoffPod(podID) - bpm.BackoffPod(podID) - if duration := bpm.calculateBackoffDuration(podID); duration != 2*time.Second { - t.Errorf("Expected backoff of 1s for pod %s, got %s", podID, duration.String()) - } - // Clear backoff for pod foo - bpm.clearPodBackoff(podID) - // Backoff once for pod foo - bpm.BackoffPod(podID) - if duration := bpm.calculateBackoffDuration(podID); duration != 1*time.Second { - t.Errorf("Expected backoff of 1s for pod %s, got %s", podID, duration.String()) - } -} diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 7020230b3de..a7a76f9150a 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -118,8 +118,11 @@ func NominatedNodeName(pod *v1.Pod) string { type PriorityQueue struct { stop chan struct{} clock util.Clock - // podBackoff tracks backoff for pods attempting to be rescheduled - podBackoff *PodBackoffMap + + // pod initial backoff duration. + podInitialBackoffDuration time.Duration + // pod maximum backoff duration. + podMaxBackoffDuration time.Duration lock sync.RWMutex cond sync.Cond @@ -212,13 +215,14 @@ func NewPriorityQueue( } pq := &PriorityQueue{ - clock: options.clock, - stop: make(chan struct{}), - podBackoff: NewPodBackoffMap(options.podInitialBackoffDuration, options.podMaxBackoffDuration, options.clock), - activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()), - unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()), - nominatedPods: newNominatedPodMap(), - moveRequestCycle: -1, + clock: options.clock, + stop: make(chan struct{}), + podInitialBackoffDuration: options.podInitialBackoffDuration, + podMaxBackoffDuration: options.podMaxBackoffDuration, + activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()), + unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()), + nominatedPods: newNominatedPodMap(), + moveRequestCycle: -1, } pq.cond.L = &pq.lock pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) @@ -265,33 +269,13 @@ func nsNameForPod(pod *v1.Pod) ktypes.NamespacedName { } } -// clearPodBackoff clears all backoff state for a pod (resets expiry) -func (p *PriorityQueue) clearPodBackoff(pod *v1.Pod) { - p.podBackoff.ClearPodBackoff(nsNameForPod(pod)) -} - -// isPodBackingOff returns true if a pod is still waiting for its backoff timer. +// isPodBackingoff returns true if a pod is still waiting for its backoff timer. // If this returns true, the pod should not be re-tried. -func (p *PriorityQueue) isPodBackingOff(pod *v1.Pod) bool { - boTime, exists := p.podBackoff.GetBackoffTime(nsNameForPod(pod)) - if !exists { - return false - } +func (p *PriorityQueue) isPodBackingoff(podInfo *framework.PodInfo) bool { + boTime := p.getBackoffTime(podInfo) return boTime.After(p.clock.Now()) } -// backoffPod checks if pod is currently undergoing backoff. If it is not it updates the backoff -// timeout otherwise it does nothing. -func (p *PriorityQueue) backoffPod(pod *v1.Pod) { - p.podBackoff.CleanupPodsCompletesBackingoff() - - podID := nsNameForPod(pod) - boTime, found := p.podBackoff.GetBackoffTime(podID) - if !found || boTime.Before(p.clock.Now()) { - p.podBackoff.BackoffPod(podID) - } -} - // SchedulingCycle returns current scheduling cycle. func (p *PriorityQueue) SchedulingCycle() int64 { p.lock.RLock() @@ -320,9 +304,6 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.PodInfo, p return fmt.Errorf("pod %v is already present in the backoff queue", nsNameForPod(pod)) } - // Every unschedulable pod is subject to backoff timers. - p.backoffPod(pod) - // If a move request has been received, move it to the BackoffQ, otherwise move // it to unschedulableQ. if p.moveRequestCycle >= podSchedulingCycle { @@ -350,16 +331,7 @@ func (p *PriorityQueue) flushBackoffQCompleted() { return } pod := rawPodInfo.(*framework.PodInfo).Pod - boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod)) - if !found { - klog.Errorf("Unable to find backoff value for pod %v in backoff queue", nsNameForPod(pod)) - p.podBackoffQ.Pop() - p.activeQ.Add(rawPodInfo) - metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc() - defer p.cond.Broadcast() - continue - } - + boTime := p.getBackoffTime(rawPodInfo.(*framework.PodInfo)) if boTime.After(p.clock.Now()) { return } @@ -465,8 +437,6 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { if usPodInfo := p.unschedulableQ.get(newPod); usPodInfo != nil { p.nominatedPods.update(oldPod, newPod) if isPodUpdated(oldPod, newPod) { - // If the pod is updated reset backoff - p.clearPodBackoff(newPod) p.unschedulableQ.delete(usPodInfo.Pod) err := p.activeQ.Add(updatePod(usPodInfo, newPod)) if err == nil { @@ -495,7 +465,6 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error { p.nominatedPods.delete(pod) err := p.activeQ.Delete(newPodInfoNoTimestamp(pod)) if err != nil { // The item was probably not found in the activeQ. - p.clearPodBackoff(pod) p.podBackoffQ.Delete(newPodInfoNoTimestamp(pod)) p.unschedulableQ.delete(pod) } @@ -538,7 +507,7 @@ func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string) { func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.PodInfo, event string) { for _, pInfo := range podInfoList { pod := pInfo.Pod - if p.isPodBackingOff(pod) { + if p.isPodBackingoff(pInfo) { if err := p.podBackoffQ.Add(pInfo); err != nil { klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) } else { @@ -640,8 +609,8 @@ func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool { pInfo1 := podInfo1.(*framework.PodInfo) pInfo2 := podInfo2.(*framework.PodInfo) - bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo1.Pod)) - bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo2.Pod)) + bo1 := p.getBackoffTime(pInfo1) + bo2 := p.getBackoffTime(pInfo2) return bo1.Before(bo2) } @@ -662,6 +631,26 @@ func (p *PriorityQueue) newPodInfo(pod *v1.Pod) *framework.PodInfo { } } +// getBackoffTime returns the time that podInfo completes backoff +func (p *PriorityQueue) getBackoffTime(podInfo *framework.PodInfo) time.Time { + duration := p.calculateBackoffDuration(podInfo) + backoffTime := podInfo.Timestamp.Add(duration) + return backoffTime +} + +// calculateBackoffDuration is a helper function for calculating the backoffDuration +// based on the number of attempts the pod has made. +func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.PodInfo) time.Duration { + duration := p.podInitialBackoffDuration + for i := 1; i < podInfo.Attempts; i++ { + duration = duration * 2 + if duration > p.podMaxBackoffDuration { + return p.podMaxBackoffDuration + } + } + return duration +} + func updatePod(oldPodInfo interface{}, newPod *v1.Pod) *framework.PodInfo { pInfo := oldPodInfo.(*framework.PodInfo) pInfo.Pod = newPod diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 6c11ba523e7..ccb5c12797f 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -111,12 +111,6 @@ var highPriorityPod, highPriNominatedPod, medPriorityPod, unschedulablePod = v1. }, } -func addOrUpdateUnschedulablePod(p *PriorityQueue, podInfo *framework.PodInfo) { - p.lock.Lock() - defer p.lock.Unlock() - p.unschedulableQ.addOrUpdate(podInfo) -} - func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod { p.lock.Lock() defer p.lock.Unlock() @@ -348,6 +342,18 @@ func TestPriorityQueue_Update(t *testing.T) { if p, err := q.Pop(); err != nil || p.Pod != &highPriNominatedPod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name) } + // Updating a pod that is in unschedulableQ in a way that it may + // become schedulable should add the pod to the activeQ. + q.AddUnschedulableIfNotPresent(q.newPodInfo(&medPriorityPod), q.SchedulingCycle()) + if len(q.unschedulableQ.podInfoMap) != 1 { + t.Error("Expected unschedulableQ to be 1.") + } + updatedPod := medPriorityPod.DeepCopy() + updatedPod.ClusterName = "test" + q.Update(&medPriorityPod, updatedPod) + if p, err := q.Pop(); err != nil || p.Pod != updatedPod { + t.Errorf("Expected: %v after Pop, but got: %v", updatedPod.Name, p.Pod.Name) + } } func TestPriorityQueue_Delete(t *testing.T) { @@ -374,14 +380,17 @@ func TestPriorityQueue_Delete(t *testing.T) { } } -func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) { +func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { q := createAndRunPriorityQueue(newDefaultFramework()) q.Add(&medPriorityPod) - addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod)) - addOrUpdateUnschedulablePod(q, q.newPodInfo(&highPriorityPod)) + q.AddUnschedulableIfNotPresent(q.newPodInfo(&unschedulablePod), q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(q.newPodInfo(&highPriorityPod), q.SchedulingCycle()) q.MoveAllToActiveOrBackoffQueue("test") - if q.activeQ.Len() != 3 { - t.Error("Expected all items to be in activeQ.") + if q.activeQ.Len() != 1 { + t.Error("Expected 1 item to be in activeQ") + } + if q.podBackoffQ.Len() != 2 { + t.Error("Expected 2 items to be in podBackoffQ") } } @@ -421,11 +430,15 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { Spec: v1.PodSpec{NodeName: "machine1"}, } - q := createAndRunPriorityQueue(newDefaultFramework()) + c := clock.NewFakeClock(time.Now()) + q := createAndRunPriorityQueue(newDefaultFramework(), WithClock(c)) q.Add(&medPriorityPod) // Add a couple of pods to the unschedulableQ. - addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod)) - addOrUpdateUnschedulablePod(q, q.newPodInfo(affinityPod)) + q.AddUnschedulableIfNotPresent(q.newPodInfo(&unschedulablePod), q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(q.newPodInfo(affinityPod), q.SchedulingCycle()) + + // Move clock to make the unschedulable pods complete backoff. + c.Step(DefaultPodInitialBackoffDuration + time.Second) // Simulate addition of an assigned pod. The pod has matching labels for // affinityPod. So, affinityPod should go to activeQ. q.AssignedPodAdded(&labelPod) @@ -469,8 +482,9 @@ func TestPriorityQueue_PendingPods(t *testing.T) { q := createAndRunPriorityQueue(newDefaultFramework()) q.Add(&medPriorityPod) - addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod)) - addOrUpdateUnschedulablePod(q, q.newPodInfo(&highPriorityPod)) + q.AddUnschedulableIfNotPresent(q.newPodInfo(&unschedulablePod), q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(q.newPodInfo(&highPriorityPod), q.SchedulingCycle()) + expectedSet := makeSet([]*v1.Pod{&medPriorityPod, &unschedulablePod, &highPriorityPod}) if !reflect.DeepEqual(expectedSet, makeSet(q.PendingPods())) { t.Error("Unexpected list of pending Pods.") @@ -558,12 +572,12 @@ func TestPriorityQueue_NewWithOptions(t *testing.T) { WithPodMaxBackoffDuration(20*time.Second), ) - if q.podBackoff.initialDuration != 2*time.Second { - t.Errorf("Unexpected pod backoff initial duration. Expected: %v, got: %v", 2*time.Second, q.podBackoff.initialDuration) + if q.podInitialBackoffDuration != 2*time.Second { + t.Errorf("Unexpected pod backoff initial duration. Expected: %v, got: %v", 2*time.Second, q.podInitialBackoffDuration) } - if q.podBackoff.maxDuration != 20*time.Second { - t.Errorf("Unexpected pod backoff max duration. Expected: %v, got: %v", 2*time.Second, q.podBackoff.maxDuration) + if q.podMaxBackoffDuration != 20*time.Second { + t.Errorf("Unexpected pod backoff max duration. Expected: %v, got: %v", 2*time.Second, q.podMaxBackoffDuration) } } @@ -808,7 +822,8 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { // This behavior ensures that an unschedulable pod does not block head of the queue when there // are frequent events that move pods to the active queue. func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultFramework()) + c := clock.NewFakeClock(time.Now()) + q := createAndRunPriorityQueue(newDefaultFramework(), WithClock(c)) // Add an unschedulable pod to a priority queue. // This makes a situation that the pod was tried to schedule @@ -837,8 +852,8 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { // Put in the unschedulable queue q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle()) - // Clear its backoff to simulate backoff its expiration - q.clearPodBackoff(&unschedulablePod) + // Move clock to make the unschedulable pods complete backoff. + c.Step(DefaultPodInitialBackoffDuration + time.Second) // Move all unschedulable pods to the active queue. q.MoveAllToActiveOrBackoffQueue("test") @@ -880,8 +895,8 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { // And then, put unschedulable pod to the unschedulable queue q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle()) - // Clear its backoff to simulate its backoff expiration - q.clearPodBackoff(&unschedulablePod) + // Move clock to make the unschedulable pods complete backoff. + c.Step(DefaultPodInitialBackoffDuration + time.Second) // Move all unschedulable pods to the active queue. q.MoveAllToActiveOrBackoffQueue("test") @@ -962,7 +977,8 @@ func TestHighPriorityBackoff(t *testing.T) { // TestHighPriorityFlushUnschedulableQLeftover tests that pods will be moved to // activeQ after one minutes if it is in unschedulableQ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultFramework()) + c := clock.NewFakeClock(time.Now()) + q := createAndRunPriorityQueue(newDefaultFramework(), WithClock(c)) midPod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "test-midpod", @@ -1006,12 +1022,9 @@ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) { Message: "fake scheduling failure", }) - highPodInfo := q.newPodInfo(&highPod) - highPodInfo.Timestamp = time.Now().Add(-1 * unschedulableQTimeInterval) - midPodInfo := q.newPodInfo(&midPod) - midPodInfo.Timestamp = time.Now().Add(-1 * unschedulableQTimeInterval) - addOrUpdateUnschedulablePod(q, highPodInfo) - addOrUpdateUnschedulablePod(q, midPodInfo) + q.AddUnschedulableIfNotPresent(q.newPodInfo(&highPod), q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(q.newPodInfo(&midPod), q.SchedulingCycle()) + c.Step(unschedulableQTimeInterval + time.Second) if p, err := q.Pop(); err != nil || p.Pod != &highPod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name) @@ -1063,9 +1076,6 @@ var ( moveAllToActiveOrBackoffQ = func(queue *PriorityQueue, _ *framework.PodInfo) { queue.MoveAllToActiveOrBackoffQueue("test") } - backoffPod = func(queue *PriorityQueue, pInfo *framework.PodInfo) { - queue.backoffPod(pInfo.Pod) - } flushBackoffQ = func(queue *PriorityQueue, _ *framework.PodInfo) { queue.clock.(*clock.FakeClock).Step(2 * time.Second) queue.flushBackoffQCompleted() @@ -1138,9 +1148,10 @@ func TestPodTimestamp(t *testing.T) { operations: []operation{ addPodUnschedulableQ, addPodUnschedulableQ, + moveClockForward, moveAllToActiveOrBackoffQ, }, - operands: []*framework.PodInfo{pInfo2, pInfo1, nil}, + operands: []*framework.PodInfo{pInfo2, pInfo1, nil, nil}, expected: []*framework.PodInfo{pInfo1, pInfo2}, }, { @@ -1148,11 +1159,10 @@ func TestPodTimestamp(t *testing.T) { operations: []operation{ addPodActiveQ, addPodBackoffQ, - backoffPod, flushBackoffQ, moveAllToActiveOrBackoffQ, }, - operands: []*framework.PodInfo{pInfo2, pInfo1, pInfo1, nil, nil}, + operands: []*framework.PodInfo{pInfo2, pInfo1, nil, nil}, expected: []*framework.PodInfo{pInfo1, pInfo2}, }, } @@ -1184,23 +1194,13 @@ func TestPodTimestamp(t *testing.T) { // TestPendingPodsMetric tests Prometheus metrics related with pending pods func TestPendingPodsMetric(t *testing.T) { - total := 50 timestamp := time.Now() metrics.Register() - var pInfos = make([]*framework.PodInfo, 0, total) - for i := 1; i <= total; i++ { - p := &framework.PodInfo{ - Pod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("test-pod-%d", i), - Namespace: fmt.Sprintf("ns%d", i), - UID: types.UID(fmt.Sprintf("tp-%d", i)), - }, - }, - Timestamp: timestamp, - } - pInfos = append(pInfos, p) - } + total := 50 + pInfos := makePodInfos(total, timestamp) + totalWithDelay := 20 + pInfosWithDelay := makePodInfos(totalWithDelay, timestamp.Add(2*time.Second)) + tests := []struct { name string operations []operation @@ -1231,14 +1231,12 @@ scheduler_pending_pods{queue="unschedulable"} 20 name: "add pods to all kinds of queues", operations: []operation{ addPodActiveQ, - backoffPod, addPodBackoffQ, addPodUnschedulableQ, }, operands: [][]*framework.PodInfo{ pInfos[:15], pInfos[15:40], - pInfos[15:40], pInfos[40:], }, metricsName: "scheduler_pending_pods", @@ -1254,11 +1252,13 @@ scheduler_pending_pods{queue="unschedulable"} 10 name: "add pods to unschedulableQ and then move all to activeQ", operations: []operation{ addPodUnschedulableQ, + moveClockForward, moveAllToActiveOrBackoffQ, }, operands: [][]*framework.PodInfo{ pInfos[:total], {nil}, + {nil}, }, metricsName: "scheduler_pending_pods", wants: ` @@ -1272,13 +1272,15 @@ scheduler_pending_pods{queue="unschedulable"} 0 { name: "make some pods subject to backoff, add pods to unschedulableQ, and then move all to activeQ", operations: []operation{ - backoffPod, + addPodUnschedulableQ, + moveClockForward, addPodUnschedulableQ, moveAllToActiveOrBackoffQ, }, operands: [][]*framework.PodInfo{ - pInfos[:20], - pInfos[:total], + pInfos[20:total], + {nil}, + pInfosWithDelay[:20], {nil}, }, metricsName: "scheduler_pending_pods", @@ -1293,14 +1295,12 @@ scheduler_pending_pods{queue="unschedulable"} 0 { name: "make some pods subject to backoff, add pods to unschedulableQ/activeQ, move all to activeQ, and finally flush backoffQ", operations: []operation{ - backoffPod, addPodUnschedulableQ, addPodActiveQ, moveAllToActiveOrBackoffQ, flushBackoffQ, }, operands: [][]*framework.PodInfo{ - pInfos[:20], pInfos[:40], pInfos[40:], {nil}, @@ -1565,10 +1565,7 @@ func TestBackOffFlow(t *testing.T) { } // Check backoff duration. - deadline, ok := q.podBackoff.GetBackoffTime(podID) - if !ok { - t.Errorf("didn't get backoff for pod %s", podID) - } + deadline := q.getBackoffTime(podInfo) backoff := deadline.Sub(timestamp) if backoff != step.wantBackoff { t.Errorf("got backoff %s, want %s", backoff, step.wantBackoff) @@ -1589,11 +1586,22 @@ func TestBackOffFlow(t *testing.T) { } }) } - // After some time, backoff information is cleared. - cl.Step(time.Hour) - q.podBackoff.CleanupPodsCompletesBackingoff() - _, ok := q.podBackoff.GetBackoffTime(podID) - if ok { - t.Errorf("backoff information for pod %s was not cleared", podID) - } +} + +func makePodInfos(num int, timestamp time.Time) []*framework.PodInfo { + var pInfos = make([]*framework.PodInfo, 0, num) + for i := 1; i <= num; i++ { + p := &framework.PodInfo{ + Pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-pod-%d", i), + Namespace: fmt.Sprintf("ns%d", i), + UID: types.UID(fmt.Sprintf("tp-%d", i)), + }, + }, + Timestamp: timestamp, + } + pInfos = append(pInfos, p) + } + return pInfos }