diff --git a/pkg/scheduler/internal/queue/pod_backoff.go b/pkg/scheduler/internal/queue/pod_backoff.go index da89815a377..06c857ffa4e 100644 --- a/pkg/scheduler/internal/queue/pod_backoff.go +++ b/pkg/scheduler/internal/queue/pod_backoff.go @@ -21,12 +21,14 @@ import ( "time" ktypes "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/scheduler/util" ) // PodBackoffMap is a structure that stores backoff related information for pods type PodBackoffMap struct { // lock for performing actions on this PodBackoffMap - lock sync.RWMutex + lock sync.RWMutex + clock util.Clock // initial backoff duration initialDuration time.Duration // maximal backoff duration @@ -38,8 +40,9 @@ type PodBackoffMap struct { } // NewPodBackoffMap creates a PodBackoffMap with initial duration and max duration. -func NewPodBackoffMap(initialDuration, maxDuration time.Duration) *PodBackoffMap { +func NewPodBackoffMap(initialDuration, maxDuration time.Duration, clock util.Clock) *PodBackoffMap { return &PodBackoffMap{ + clock: clock, initialDuration: initialDuration, maxDuration: maxDuration, podAttempts: make(map[ktypes.NamespacedName]int), @@ -91,12 +94,16 @@ func (pbm *PodBackoffMap) ClearPodBackoff(nsPod ktypes.NamespacedName) { // CleanupPodsCompletesBackingoff execute garbage collection on the pod backoff, // i.e, it will remove a pod from the PodBackoffMap if -// lastUpdateTime + maxBackoffDuration is before the current timestamp +// lastUpdateTime + maxDuration >> timestamp +// We should wait longer than the maxDuration so that the pod gets a chance to +// (1) move to the active queue and (2) get an schedule attempt. func (pbm *PodBackoffMap) CleanupPodsCompletesBackingoff() { pbm.lock.Lock() defer pbm.lock.Unlock() for pod, value := range pbm.podLastUpdateTime { - if value.Add(pbm.maxDuration).Before(time.Now()) { + // Here we assume that maxDuration should be enough for a pod to move up the + // active queue and get an schedule attempt. + if value.Add(2 * pbm.maxDuration).Before(pbm.clock.Now()) { pbm.clearPodBackoff(pod) } } @@ -106,7 +113,7 @@ func (pbm *PodBackoffMap) CleanupPodsCompletesBackingoff() { // and increases its numberOfAttempts by 1 func (pbm *PodBackoffMap) BackoffPod(nsPod ktypes.NamespacedName) { pbm.lock.Lock() - pbm.podLastUpdateTime[nsPod] = time.Now() + pbm.podLastUpdateTime[nsPod] = pbm.clock.Now() pbm.podAttempts[nsPod]++ pbm.lock.Unlock() } diff --git a/pkg/scheduler/internal/queue/pod_backoff_test.go b/pkg/scheduler/internal/queue/pod_backoff_test.go index 0f20d6cd921..6525cc335c2 100644 --- a/pkg/scheduler/internal/queue/pod_backoff_test.go +++ b/pkg/scheduler/internal/queue/pod_backoff_test.go @@ -17,15 +17,17 @@ limitations under the License. package queue import ( + "fmt" "testing" "time" ktypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/clock" ) func TestBackoffPod(t *testing.T) { - bpm := NewPodBackoffMap(1*time.Second, 10*time.Second) - + timestamp := time.Now() + bpm := NewPodBackoffMap(1*time.Second, 10*time.Second, clock.NewFakeClock(timestamp)) tests := []struct { podID ktypes.NamespacedName expectedDuration time.Duration @@ -61,20 +63,23 @@ func TestBackoffPod(t *testing.T) { }, } - for _, test := range tests { - // Backoff the pod - bpm.BackoffPod(test.podID) - // Get backoff duration for the pod - duration := bpm.calculateBackoffDuration(test.podID) - - if duration != test.expectedDuration { - t.Errorf("expected: %s, got %s for pod %s", test.expectedDuration.String(), duration.String(), test.podID) - } + for i, test := range tests { + t.Run(fmt.Sprintf("step %d", i), func(t *testing.T) { + bpm.BackoffPod(test.podID) + backoff, ok := bpm.GetBackoffTime(test.podID) + if !ok { + t.Errorf("%v should be backed off", test.podID) + } + duration := backoff.Sub(timestamp) + if duration != test.expectedDuration { + t.Errorf("expected: %s, got %s for pod %s", test.expectedDuration.String(), duration.String(), test.podID) + } + }) } } func TestClearPodBackoff(t *testing.T) { - bpm := NewPodBackoffMap(1*time.Second, 60*time.Second) + bpm := NewPodBackoffMap(1*time.Second, 60*time.Second, clock.NewFakeClock(time.Now())) // Clear backoff on an not existed pod bpm.clearPodBackoff(ktypes.NamespacedName{Namespace: "ns", Name: "not-existed"}) // Backoff twice for pod foo diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 7130c42dcb6..7020230b3de 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -214,7 +214,7 @@ func NewPriorityQueue( pq := &PriorityQueue{ clock: options.clock, stop: make(chan struct{}), - podBackoff: NewPodBackoffMap(options.podInitialBackoffDuration, options.podMaxBackoffDuration), + podBackoff: NewPodBackoffMap(options.podInitialBackoffDuration, options.podMaxBackoffDuration, options.clock), activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()), unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()), nominatedPods: newNominatedPodMap(), diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index dc06a74dba2..6c11ba523e7 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -1515,3 +1515,85 @@ func createAndRunPriorityQueue(fwk framework.Framework, opts ...Option) *Priorit q.Run() return q } + +func TestBackOffFlow(t *testing.T) { + cl := clock.NewFakeClock(time.Now()) + q := NewPriorityQueue(newDefaultFramework(), WithClock(cl)) + steps := []struct { + wantBackoff time.Duration + }{ + {wantBackoff: time.Second}, + {wantBackoff: 2 * time.Second}, + {wantBackoff: 4 * time.Second}, + {wantBackoff: 8 * time.Second}, + {wantBackoff: 10 * time.Second}, + {wantBackoff: 10 * time.Second}, + {wantBackoff: 10 * time.Second}, + } + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "test-ns", + UID: "test-uid", + }, + } + podID := nsNameForPod(pod) + if err := q.Add(pod); err != nil { + t.Fatal(err) + } + + for i, step := range steps { + t.Run(fmt.Sprintf("step %d", i), func(t *testing.T) { + timestamp := cl.Now() + // Simulate schedule attempt. + podInfo, err := q.Pop() + if err != nil { + t.Fatal(err) + } + if podInfo.Attempts != i+1 { + t.Errorf("got attempts %d, want %d", podInfo.Attempts, i+1) + } + if err := q.AddUnschedulableIfNotPresent(podInfo, int64(i)); err != nil { + t.Fatal(err) + } + + // An event happens. + q.MoveAllToActiveOrBackoffQueue("deleted pod") + + if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok { + t.Errorf("pod %v is not in the backoff queue", podID) + } + + // Check backoff duration. + deadline, ok := q.podBackoff.GetBackoffTime(podID) + if !ok { + t.Errorf("didn't get backoff for pod %s", podID) + } + backoff := deadline.Sub(timestamp) + if backoff != step.wantBackoff { + t.Errorf("got backoff %s, want %s", backoff, step.wantBackoff) + } + + // Simulate routine that continuously flushes the backoff queue. + cl.Step(time.Millisecond) + q.flushBackoffQCompleted() + // Still in backoff queue after an early flush. + if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok { + t.Errorf("pod %v is not in the backoff queue", podID) + } + // Moved out of the backoff queue after timeout. + cl.Step(backoff) + q.flushBackoffQCompleted() + if _, ok, _ := q.podBackoffQ.Get(podInfo); ok { + t.Errorf("pod %v is still in the backoff queue", podID) + } + }) + } + // After some time, backoff information is cleared. + cl.Step(time.Hour) + q.podBackoff.CleanupPodsCompletesBackingoff() + _, ok := q.podBackoff.GetBackoffTime(podID) + if ok { + t.Errorf("backoff information for pod %s was not cleared", podID) + } +}