Merge pull request #130680 from macsko/update_backoffq_less_function_to_order_by_priority_in_windows

Update backoffQ's less function to order pods by priority in windows
This commit is contained in:
Kubernetes Prow Robot 2025-03-20 01:36:31 -07:00 committed by GitHub
commit 65d9066665
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 332 additions and 122 deletions

View File

@ -27,10 +27,18 @@ import (
"k8s.io/utils/clock"
)
// backoffQOrderingWindowDuration is a duration of an ordering window in the podBackoffQ.
// In each window, represented as a whole second, pods are ordered by priority.
// It is the same as interval of flushing the pods from the podBackoffQ to the activeQ, to flush the whole windows there.
// This works only if PopFromBackoffQ feature is enabled.
// See the KEP-5142 (http://kep.k8s.io/5142) for rationale.
const backoffQOrderingWindowDuration = time.Second
// backoffQueuer is a wrapper for backoffQ related operations.
type backoffQueuer interface {
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
// If this returns true, the pod should not be re-tried.
// If the pod backoff time is in the actual ordering window, it should still be backing off.
isPodBackingoff(podInfo *framework.QueuedPodInfo) bool
// popEachBackoffCompleted run fn for all pods from podBackoffQ and podErrorBackoffQ that completed backoff while popping them.
popEachBackoffCompleted(logger klog.Logger, fn func(pInfo *framework.QueuedPodInfo))
@ -39,6 +47,11 @@ type backoffQueuer interface {
podInitialBackoffDuration() time.Duration
// podMaxBackoffDuration returns maximum backoff duration that pod can get.
podMaxBackoffDuration() time.Duration
// waitUntilAlignedWithOrderingWindow waits until the time reaches a multiple of backoffQOrderingWindowDuration.
// It then runs the f function at the backoffQOrderingWindowDuration interval using a ticker.
// It's important to align the flushing time, because podBackoffQ's ordering is based on the windows
// and whole windows have to be flushed at one time without a visible latency.
waitUntilAlignedWithOrderingWindow(f func(), stopCh <-chan struct{})
// add adds the pInfo to backoffQueue.
// The event should show which event triggered this addition and is used for the metric recording.
@ -54,7 +67,7 @@ type backoffQueuer interface {
// has inform if pInfo exists in the queue.
has(pInfo *framework.QueuedPodInfo) bool
// list returns all pods that are in the queue.
list() []*framework.QueuedPodInfo
list() []*v1.Pod
// len returns length of the queue.
len() int
}
@ -62,7 +75,7 @@ type backoffQueuer interface {
// backoffQueue implements backoffQueuer and wraps two queues inside,
// providing seamless access as if it were one queue.
type backoffQueue struct {
clock clock.Clock
clock clock.WithTicker
// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
// are popped from this heap before the scheduler looks at activeQ
@ -73,15 +86,27 @@ type backoffQueue struct {
podInitialBackoff time.Duration
podMaxBackoff time.Duration
// activeQLessFn is used as an eventual less function if two backoff times are equal,
// when the SchedulerPopFromBackoffQ feature is enabled.
activeQLessFn framework.LessFunc
// isPopFromBackoffQEnabled indicates whether the feature gate SchedulerPopFromBackoffQ is enabled.
isPopFromBackoffQEnabled bool
}
func newBackoffQueue(clock clock.Clock, podInitialBackoffDuration time.Duration, podMaxBackoffDuration time.Duration) *backoffQueue {
func newBackoffQueue(clock clock.WithTicker, podInitialBackoffDuration time.Duration, podMaxBackoffDuration time.Duration, activeQLessFn framework.LessFunc, popFromBackoffQEnabled bool) *backoffQueue {
bq := &backoffQueue{
clock: clock,
podInitialBackoff: podInitialBackoffDuration,
podMaxBackoff: podMaxBackoffDuration,
clock: clock,
podInitialBackoff: podInitialBackoffDuration,
podMaxBackoff: podMaxBackoffDuration,
isPopFromBackoffQEnabled: popFromBackoffQEnabled,
activeQLessFn: activeQLessFn,
}
bq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, bq.lessBackoffCompleted, metrics.NewBackoffPodsRecorder())
podBackoffQLessFn := bq.lessBackoffCompleted
if popFromBackoffQEnabled {
podBackoffQLessFn = bq.lessBackoffCompletedWithPriority
}
bq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, podBackoffQLessFn, metrics.NewBackoffPodsRecorder())
bq.podErrorBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, bq.lessBackoffCompleted, metrics.NewBackoffPodsRecorder())
return bq
@ -97,7 +122,70 @@ func (bq *backoffQueue) podMaxBackoffDuration() time.Duration {
return bq.podMaxBackoff
}
// lessBackoffCompleted is a less function of podBackoffQ and podErrorBackoffQ.
// alignToWindow truncates the provided time to the podBackoffQ ordering window.
// It returns the lowest possible timestamp in the window.
func (bq *backoffQueue) alignToWindow(t time.Time) time.Time {
if !bq.isPopFromBackoffQEnabled {
return t
}
return t.Truncate(backoffQOrderingWindowDuration)
}
// waitUntilAlignedWithOrderingWindow waits until the time reaches a multiple of backoffQOrderingWindowDuration.
// It then runs the f function at the backoffQOrderingWindowDuration interval using a ticker.
// It's important to align the flushing time, because podBackoffQ's ordering is based on the windows
// and whole windows have to be flushed at one time without a visible latency.
func (bq *backoffQueue) waitUntilAlignedWithOrderingWindow(f func(), stopCh <-chan struct{}) {
now := bq.clock.Now()
// Wait until the time reaches the multiple of backoffQOrderingWindowDuration.
durationToNextWindow := bq.alignToWindow(now.Add(backoffQOrderingWindowDuration)).Sub(now)
timer := bq.clock.NewTimer(durationToNextWindow)
select {
case <-stopCh:
timer.Stop()
return
case <-timer.C():
}
// Run a ticker to make sure the invocations of f function
// are aligned with the backoffQ's ordering window.
ticker := bq.clock.NewTicker(backoffQOrderingWindowDuration)
for {
select {
case <-stopCh:
return
default:
}
f()
// NOTE: b/c there is no priority selection in golang
// it is possible for this to race, meaning we could
// trigger ticker.C and stopCh, and ticker.C select falls through.
// In order to mitigate we re-check stopCh at the beginning
// of every loop to prevent extra executions of f().
select {
case <-stopCh:
ticker.Stop()
return
case <-ticker.C():
}
}
}
// lessBackoffCompletedWithPriority is a less function of podBackoffQ if PopFromBackoffQ feature is enabled.
// It orders the pods in the same BackoffOrderingWindow the same as the activeQ will do to improve popping order from backoffQ when activeQ is empty.
func (bq *backoffQueue) lessBackoffCompletedWithPriority(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
bo1 := bq.getBackoffTime(pInfo1)
bo2 := bq.getBackoffTime(pInfo2)
if !bo1.Equal(bo2) {
return bo1.Before(bo2)
}
// If the backoff time is the same, sort the pod in the same manner as activeQ does.
return bq.activeQLessFn(pInfo1, pInfo2)
}
// lessBackoffCompleted is a less function of podErrorBackoffQ.
func (bq *backoffQueue) lessBackoffCompleted(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
bo1 := bq.getBackoffTime(pInfo1)
bo2 := bq.getBackoffTime(pInfo2)
@ -106,9 +194,11 @@ func (bq *backoffQueue) lessBackoffCompleted(pInfo1, pInfo2 *framework.QueuedPod
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
// If this returns true, the pod should not be re-tried.
// If the pod backoff time is in the actual ordering window, it should still be backing off.
func (bq *backoffQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool {
boTime := bq.getBackoffTime(podInfo)
return boTime.After(bq.clock.Now())
// Don't use After, because in case of windows equality we want to return true.
return !boTime.Before(bq.alignToWindow(bq.clock.Now()))
}
// getBackoffTime returns the time that podInfo completes backoff.
@ -117,9 +207,14 @@ func (bq *backoffQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool {
// because of the fact that the backoff time is calculated based on podInfo.Attempts,
// which doesn't get changed until the pod's scheduling is retried.
func (bq *backoffQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time {
if podInfo.Attempts == 0 {
// Don't store backoff expiration if the duration is 0
// to correctly handle isPodBackingoff, if pod should skip backoff, when it wasn't tried at all.
return time.Time{}
}
if podInfo.BackoffExpiration.IsZero() {
duration := bq.calculateBackoffDuration(podInfo)
podInfo.BackoffExpiration = podInfo.Timestamp.Add(duration)
podInfo.BackoffExpiration = bq.alignToWindow(podInfo.Timestamp.Add(duration))
}
return podInfo.BackoffExpiration
}
@ -238,8 +333,15 @@ func (bq *backoffQueue) has(pInfo *framework.QueuedPodInfo) bool {
}
// list returns all pods that are in the queue.
func (bq *backoffQueue) list() []*framework.QueuedPodInfo {
return append(bq.podBackoffQ.List(), bq.podErrorBackoffQ.List()...)
func (bq *backoffQueue) list() []*v1.Pod {
var result []*v1.Pod
for _, pInfo := range bq.podBackoffQ.List() {
result = append(result, pInfo.Pod)
}
for _, pInfo := range bq.podErrorBackoffQ.List() {
result = append(result, pInfo.Pod)
}
return result
}
// len returns length of the queue.

View File

@ -17,6 +17,7 @@ limitations under the License.
package queue
import (
"fmt"
"math"
"testing"
"time"
@ -69,7 +70,7 @@ func TestBackoffQueue_calculateBackoffDuration(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
bq := newBackoffQueue(clock.RealClock{}, tt.initialBackoffDuration, tt.maxBackoffDuration)
bq := newBackoffQueue(clock.RealClock{}, tt.initialBackoffDuration, tt.maxBackoffDuration, newDefaultQueueSort(), true)
if got := bq.calculateBackoffDuration(tt.podInfo); got != tt.want {
t.Errorf("backoffQueue.calculateBackoffDuration() = %v, want %v", got, tt.want)
}
@ -84,7 +85,7 @@ func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) {
PodInfo: &framework.PodInfo{
Pod: st.MakePod().Name("pod0").Obj(),
},
Timestamp: fakeClock.Now().Add(-time.Second),
Timestamp: fakeClock.Now().Add(-2 * time.Second),
Attempts: 1,
UnschedulablePlugins: sets.New("plugin"),
},
@ -100,7 +101,7 @@ func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) {
PodInfo: &framework.PodInfo{
Pod: st.MakePod().Name("pod2").Obj(),
},
Timestamp: fakeClock.Now().Add(-time.Second),
Timestamp: fakeClock.Now().Add(-2 * time.Second),
Attempts: 1,
},
"pod3": {
@ -147,12 +148,105 @@ func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) {
wantPods: nil,
},
}
for _, tt := range tests {
for _, popFromBackoffQEnabled := range []bool{true, false} {
t.Run(fmt.Sprintf("%s popFromBackoffQEnabled(%v)", tt.name, popFromBackoffQEnabled), func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
bq := newBackoffQueue(fakeClock, DefaultPodInitialBackoffDuration, DefaultPodMaxBackoffDuration, newDefaultQueueSort(), popFromBackoffQEnabled)
for _, podName := range tt.podsInBackoff {
bq.add(logger, podInfos[podName], framework.EventUnscheduledPodAdd.Label())
}
var gotPods []string
bq.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) {
gotPods = append(gotPods, pInfo.Pod.Name)
})
if diff := cmp.Diff(tt.wantPods, gotPods); diff != "" {
t.Errorf("Unexpected pods moved (-want, +got):\n%s", diff)
}
podsToStayInBackoff := len(tt.podsInBackoff) - len(tt.wantPods)
if bq.len() != podsToStayInBackoff {
t.Errorf("Expected %v pods to stay in backoffQ, but got: %v", podsToStayInBackoff, bq.len())
}
})
}
}
}
func TestBackoffQueueOrdering(t *testing.T) {
// Align the fake clock with ordering window.
fakeClock := testingclock.NewFakeClock(time.Now().Truncate(backoffQOrderingWindowDuration))
podInfos := []*framework.QueuedPodInfo{
{
PodInfo: &framework.PodInfo{
Pod: st.MakePod().Name("pod0").Priority(1).Obj(),
},
Timestamp: fakeClock.Now(),
Attempts: 1,
UnschedulablePlugins: sets.New("plugin"),
},
{
PodInfo: &framework.PodInfo{
Pod: st.MakePod().Name("pod1").Priority(1).Obj(),
},
Timestamp: fakeClock.Now().Add(-time.Second),
Attempts: 1,
UnschedulablePlugins: sets.New("plugin"),
},
{
PodInfo: &framework.PodInfo{
Pod: st.MakePod().Name("pod2").Priority(2).Obj(),
},
Timestamp: fakeClock.Now().Add(-2*time.Second + time.Millisecond),
Attempts: 1,
UnschedulablePlugins: sets.New("plugin"),
},
{
PodInfo: &framework.PodInfo{
Pod: st.MakePod().Name("pod3").Priority(1).Obj(),
},
Timestamp: fakeClock.Now().Add(-2 * time.Second),
Attempts: 1,
UnschedulablePlugins: sets.New("plugin"),
},
{
PodInfo: &framework.PodInfo{
Pod: st.MakePod().Name("pod4").Priority(2).Obj(),
},
Timestamp: fakeClock.Now().Add(-2 * time.Second),
Attempts: 1,
UnschedulablePlugins: sets.New("plugin"),
},
{
PodInfo: &framework.PodInfo{
Pod: st.MakePod().Name("pod5").Priority(1).Obj(),
},
Timestamp: fakeClock.Now().Add(-3 * time.Second),
Attempts: 1,
UnschedulablePlugins: sets.New("plugin"),
},
}
tests := []struct {
name string
popFromBackoffQEnabled bool
wantPods []string
}{
{
name: "Pods with the same window are ordered by priority if PopFromBackoffQ is enabled",
popFromBackoffQEnabled: true,
wantPods: []string{"pod5", "pod4", "pod2", "pod3"},
},
{
name: "Pods priority doesn't matter if PopFromBackoffQ is disabled",
popFromBackoffQEnabled: false,
wantPods: []string{"pod5", "pod3", "pod4", "pod2"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
bq := newBackoffQueue(fakeClock, DefaultPodInitialBackoffDuration, DefaultPodMaxBackoffDuration)
for _, podName := range tt.podsInBackoff {
bq.add(logger, podInfos[podName], framework.EventUnscheduledPodAdd.Label())
bq := newBackoffQueue(fakeClock, DefaultPodInitialBackoffDuration, DefaultPodMaxBackoffDuration, newDefaultQueueSort(), tt.popFromBackoffQEnabled)
for _, podInfo := range podInfos {
bq.add(logger, podInfo, framework.EventUnscheduledPodAdd.Label())
}
var gotPods []string
bq.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) {
@ -161,10 +255,6 @@ func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) {
if diff := cmp.Diff(tt.wantPods, gotPods); diff != "" {
t.Errorf("Unexpected pods moved (-want, +got):\n%s", diff)
}
podsToStayInBackoff := len(tt.podsInBackoff) - len(tt.wantPods)
if bq.len() != podsToStayInBackoff {
t.Errorf("Expected %v pods to stay in backoffQ, but got: %v", podsToStayInBackoff, bq.len())
}
})
}
}

View File

@ -132,6 +132,8 @@ type SchedulingQueue interface {
PendingPods() ([]*v1.Pod, string)
InFlightPods() []*v1.Pod
PodsInActiveQ() []*v1.Pod
// PodsInBackoffQ returns all the Pods in the backoffQ.
PodsInBackoffQ() []*v1.Pod
}
// NewSchedulingQueue initializes a priority queue as a new scheduling queue.
@ -155,7 +157,7 @@ type PriorityQueue struct {
*nominator
stop chan struct{}
clock clock.Clock
clock clock.WithTicker
// lock takes precedence and should be taken first,
// before any other locks in the queue (activeQueue.lock or nominator.nLock).
@ -209,7 +211,7 @@ type clusterEvent struct {
}
type priorityQueueOptions struct {
clock clock.Clock
clock clock.WithTicker
podInitialBackoffDuration time.Duration
podMaxBackoffDuration time.Duration
podMaxInUnschedulablePodsDuration time.Duration
@ -224,7 +226,7 @@ type priorityQueueOptions struct {
type Option func(*priorityQueueOptions)
// WithClock sets clock for PriorityQueue, the default clock is clock.RealClock.
func WithClock(clock clock.Clock) Option {
func WithClock(clock clock.WithTicker) Option {
return func(o *priorityQueueOptions) {
o.clock = clock
}
@ -334,7 +336,7 @@ func NewPriorityQueue(
stop: make(chan struct{}),
podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
activeQ: newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled, options.metricsRecorder),
backoffQ: newBackoffQueue(options.clock, options.podInitialBackoffDuration, options.podMaxBackoffDuration),
backoffQ: newBackoffQueue(options.clock, options.podInitialBackoffDuration, options.podMaxBackoffDuration, lessFn, isPopFromBackoffQEnabled),
unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()),
preEnqueuePluginMap: options.preEnqueuePluginMap,
queueingHintMap: options.queueingHintMap,
@ -352,9 +354,9 @@ func NewPriorityQueue(
// Run starts the goroutine to pump from backoffQ to activeQ
func (p *PriorityQueue) Run(logger klog.Logger) {
go wait.Until(func() {
go p.backoffQ.waitUntilAlignedWithOrderingWindow(func() {
p.flushBackoffQCompleted(logger)
}, 1.0*time.Second, p.stop)
}, p.stop)
go wait.Until(func() {
p.flushUnschedulablePodsLeftover(logger)
}, 30*time.Second, p.stop)
@ -1177,6 +1179,11 @@ func (p *PriorityQueue) PodsInActiveQ() []*v1.Pod {
return p.activeQ.list()
}
// PodsInBackoffQ returns all the Pods in the backoffQ.
func (p *PriorityQueue) PodsInBackoffQ() []*v1.Pod {
return p.backoffQ.list()
}
var pendingPodsSummary = "activeQ:%v; backoffQ:%v; unschedulablePods:%v"
// GetPod searches for a pod in the activeQ, backoffQ, and unschedulablePods.
@ -1215,9 +1222,7 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) {
defer p.lock.RUnlock()
result := p.activeQ.list()
activeQLen := len(result)
for _, pInfo := range p.backoffQ.list() {
result = append(result, pInfo.Pod)
}
result = append(result, p.backoffQ.list()...)
for _, pInfo := range p.unschedulablePods.podInfoMap {
result = append(result, pInfo.Pod)
}

View File

@ -835,18 +835,17 @@ func Test_InFlightPods(t *testing.T) {
}
if test.wantBackoffQPodNames != nil {
podInfos := q.backoffQ.list()
pods := q.backoffQ.list()
var podNames []string
for _, pInfo := range podInfos {
podNames = append(podNames, pInfo.Pod.Name)
for _, pod := range pods {
podNames = append(podNames, pod.Name)
}
if diff := cmp.Diff(test.wantBackoffQPodNames, podNames, sortOpt); diff != "" {
t.Fatalf("Unexpected diff of backoffQ pod names (-want, +got):\n%s", diff)
}
wantPodNames := sets.New(test.wantBackoffQPodNames...)
for _, podInfo := range podInfos {
podGotFromBackoffQ := podInfo.Pod
for _, podGotFromBackoffQ := range pods {
if !wantPodNames.Has(podGotFromBackoffQ.Name) {
t.Fatalf("Pod %v was not expected to be in the backoffQ.", podGotFromBackoffQ.Name)
}
@ -1121,9 +1120,9 @@ func TestPriorityQueue_Update(t *testing.T) {
q.unschedulablePods.addOrUpdate(attemptQueuedPodInfo(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin)), framework.EventUnscheduledPodAdd.Label())
updatedPod := medPriorityPodInfo.Pod.DeepCopy()
updatedPod.Annotations["foo"] = "test1"
// Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off,
// Move clock by podMaxBackoffDuration, so that pods in the unschedulablePods would pass the backing off,
// and the pods will be moved into activeQ.
c.Step(q.backoffQ.podInitialBackoffDuration())
c.Step(q.backoffQ.podMaxBackoffDuration())
return medPriorityPodInfo.Pod, updatedPod
},
schedulingHintsEnablement: []bool{false, true},
@ -2000,9 +1999,9 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
t.Errorf("Expected %v in the backoffQ", hpp1.Name)
}
// Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off,
// Move clock by podMaxBackoffDuration, so that pods in the unschedulablePods would pass the backing off,
// and the pods will be moved into activeQ.
c.Step(q.backoffQ.podInitialBackoffDuration())
c.Step(q.backoffQ.podMaxBackoffDuration())
q.flushBackoffQCompleted(logger) // flush the completed backoffQ to move hpp1 to activeQ.
q.MoveAllToActiveOrBackoffQueue(logger, nodeAdd, nil, nil, nil)
if q.activeQ.len() != 4 {
@ -2104,9 +2103,9 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi
t.Errorf("Expected %v in the backoffQ", hpp1.Name)
}
// Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off,
// Move clock by podMaxBackoffDuration, so that pods in the unschedulablePods would pass the backing off,
// and the pods will be moved into activeQ.
c.Step(q.backoffQ.podInitialBackoffDuration())
c.Step(q.backoffQ.podMaxBackoffDuration())
q.flushBackoffQCompleted(logger) // flush the completed backoffQ to move hpp1 to activeQ.
q.MoveAllToActiveOrBackoffQueue(logger, nodeAdd, nil, nil, nil)
if q.activeQ.len() != 4 {
@ -2762,7 +2761,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
c.Step(DefaultPodInitialBackoffDuration)
c.Step(q.backoffQ.podMaxBackoffDuration())
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveOrBackoffQueue(logger, framework.EventUnschedulableTimeout, nil, nil, nil)
// Simulation is over. Now let's pop all pods. The pod popped first should be
@ -3130,11 +3129,11 @@ var (
queue.MoveAllToActiveOrBackoffQueue(logger, framework.EventUnschedulableTimeout, nil, nil, nil)
}
flushBackoffQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) {
queue.clock.(*testingclock.FakeClock).Step(2 * time.Second)
queue.clock.(*testingclock.FakeClock).Step(3 * time.Second)
queue.flushBackoffQCompleted(logger)
}
moveClockForward = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) {
queue.clock.(*testingclock.FakeClock).Step(2 * time.Second)
queue.clock.(*testingclock.FakeClock).Step(3 * time.Second)
}
flushUnscheduledQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) {
queue.clock.(*testingclock.FakeClock).Step(queue.podMaxInUnschedulablePodsDuration)
@ -3731,7 +3730,6 @@ func TestBackOffFlow(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(cl))
steps := []struct {
wantBackoff time.Duration
}{
@ -3743,58 +3741,67 @@ func TestBackOffFlow(t *testing.T) {
{wantBackoff: 10 * time.Second},
{wantBackoff: 10 * time.Second},
}
pod := st.MakePod().Name("test-pod").Namespace("test-ns").UID("test-uid").Obj()
for _, popFromBackoffQEnabled := range []bool{true, false} {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerPopFromBackoffQ, popFromBackoffQEnabled)
podID := types.NamespacedName{
Namespace: pod.Namespace,
Name: pod.Name,
}
q.Add(logger, pod)
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(cl))
for i, step := range steps {
t.Run(fmt.Sprintf("step %d", i), func(t *testing.T) {
timestamp := cl.Now()
// Simulate schedule attempt.
podInfo, err := q.Pop(logger)
if err != nil {
t.Fatal(err)
}
if podInfo.Attempts != i+1 {
t.Errorf("got attempts %d, want %d", podInfo.Attempts, i+1)
}
err = q.AddUnschedulableIfNotPresent(logger, podInfo, int64(i))
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
pod := st.MakePod().Name("test-pod").Namespace("test-ns").UID("test-uid").Obj()
podID := types.NamespacedName{
Namespace: pod.Namespace,
Name: pod.Name,
}
q.Add(logger, pod)
// An event happens.
q.MoveAllToActiveOrBackoffQueue(logger, framework.EventUnschedulableTimeout, nil, nil, nil)
for i, step := range steps {
t.Run(fmt.Sprintf("step %d popFromBackoffQEnabled(%v)", i, popFromBackoffQEnabled), func(t *testing.T) {
timestamp := cl.Now()
// Simulate schedule attempt.
podInfo, err := q.Pop(logger)
if err != nil {
t.Fatal(err)
}
if podInfo.Attempts != i+1 {
t.Errorf("got attempts %d, want %d", podInfo.Attempts, i+1)
}
err = q.AddUnschedulableIfNotPresent(logger, podInfo, int64(i))
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
if !q.backoffQ.has(podInfo) {
t.Errorf("pod %v is not in the backoff queue", podID)
}
// An event happens.
q.MoveAllToActiveOrBackoffQueue(logger, framework.EventUnschedulableTimeout, nil, nil, nil)
// Check backoff duration.
deadline := podInfo.BackoffExpiration
backoff := deadline.Sub(timestamp)
if backoff != step.wantBackoff {
t.Errorf("got backoff %s, want %s", backoff, step.wantBackoff)
}
if !q.backoffQ.has(podInfo) {
t.Errorf("pod %v is not in the backoff queue", podID)
}
// Simulate routine that continuously flushes the backoff queue.
cl.Step(time.Millisecond)
q.flushBackoffQCompleted(logger)
// Still in backoff queue after an early flush.
if !q.backoffQ.has(podInfo) {
t.Errorf("pod %v is not in the backoff queue", podID)
}
// Moved out of the backoff queue after timeout.
cl.Step(backoff)
q.flushBackoffQCompleted(logger)
if q.backoffQ.has(podInfo) {
t.Errorf("pod %v is still in the backoff queue", podID)
}
})
// Check backoff duration.
deadline := podInfo.BackoffExpiration
backoff := deadline.Sub(timestamp)
if popFromBackoffQEnabled {
// If popFromBackoffQEnabled, the actual backoff can be calculated by rounding up to the ordering window duration.
backoff = backoff.Truncate(backoffQOrderingWindowDuration) + backoffQOrderingWindowDuration
}
if backoff != step.wantBackoff {
t.Errorf("got backoff %s, want %s", backoff, step.wantBackoff)
}
// Simulate routine that continuously flushes the backoff queue.
cl.Step(backoffQOrderingWindowDuration)
q.flushBackoffQCompleted(logger)
// Still in backoff queue after an early flush.
if !q.backoffQ.has(podInfo) {
t.Errorf("pod %v is not in the backoff queue", podID)
}
// Moved out of the backoff queue after timeout.
cl.Step(backoff)
q.flushBackoffQCompleted(logger)
if q.backoffQ.has(podInfo) {
t.Errorf("pod %v is still in the backoff queue", podID)
}
})
}
}
}
@ -3812,20 +3819,20 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
preEnqueueCheck PreEnqueueCheck
podInfos []*framework.QueuedPodInfo
event framework.ClusterEvent
want []string
want sets.Set[string]
}{
{
name: "nil PreEnqueueCheck",
podInfos: podInfos,
event: framework.EventUnschedulableTimeout,
want: []string{"p0", "p1", "p2", "p3", "p4"},
want: sets.New("p0", "p1", "p2", "p3", "p4"),
},
{
name: "move Pods with priority greater than 2",
podInfos: podInfos,
event: framework.EventUnschedulableTimeout,
preEnqueueCheck: func(pod *v1.Pod) bool { return *pod.Spec.Priority >= 2 },
want: []string{"p2", "p3", "p4"},
want: sets.New("p2", "p3", "p4"),
},
{
name: "move Pods with even priority and greater than 2",
@ -3834,7 +3841,7 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
preEnqueueCheck: func(pod *v1.Pod) bool {
return *pod.Spec.Priority%2 == 0 && *pod.Spec.Priority >= 2
},
want: []string{"p2", "p4"},
want: sets.New("p2", "p4"),
},
{
name: "move Pods with even and negative priority",
@ -3856,12 +3863,12 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := testingclock.NewFakeClock(time.Now())
c := testingclock.NewFakeClock(time.Now().Truncate(backoffQOrderingWindowDuration))
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c))
for i, podInfo := range tt.podInfos {
for _, podInfo := range tt.podInfos {
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below.
q.Add(logger, podInfo.Pod)
if p, err := q.Pop(logger); err != nil || p.Pod != podInfo.Pod {
@ -3872,18 +3879,12 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
// NOTE: On Windows, time.Now() is not as precise, 2 consecutive calls may return the same timestamp,
// resulting in 0 time delta / latency. This will cause the pods to be backed off in a random
// order, which would cause this test to fail, since the expectation is for them to be backed off
// in a certain order.
// See: https://github.com/golang/go/issues/8687
podInfo.Timestamp = podInfo.Timestamp.Add(time.Duration((i - len(tt.podInfos))) * time.Millisecond)
}
q.MoveAllToActiveOrBackoffQueue(logger, tt.event, nil, nil, tt.preEnqueueCheck)
var got []string
got := sets.New[string]()
c.Step(2 * q.backoffQ.podMaxBackoffDuration())
q.backoffQ.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) {
got = append(got, pInfo.Pod.Name)
got.Insert(pInfo.Pod.Name)
})
if diff := cmp.Diff(tt.want, got); diff != "" {
t.Errorf("Unexpected diff (-want, +got):\n%s", diff)

View File

@ -98,9 +98,9 @@ func TestEventHandlers_MoveToActiveOnNominatedNodeUpdate(t *testing.T) {
}
tests := []struct {
name string
updateFunc func(s *Scheduler)
wantInActive sets.Set[string]
name string
updateFunc func(s *Scheduler)
wantInActiveOrBackoff sets.Set[string]
}{
{
name: "Update of a nominated node name to a different value should trigger rescheduling of lower priority pods",
@ -110,7 +110,7 @@ func TestEventHandlers_MoveToActiveOnNominatedNodeUpdate(t *testing.T) {
updatedPod.ResourceVersion = "1"
s.updatePodInSchedulingQueue(medNominatedPriorityPod, updatedPod)
},
wantInActive: sets.New(lowPriorityPod.Name, medPriorityPod.Name, medNominatedPriorityPod.Name),
wantInActiveOrBackoff: sets.New(lowPriorityPod.Name, medPriorityPod.Name, medNominatedPriorityPod.Name),
},
{
name: "Removal of a nominated node name should trigger rescheduling of lower priority pods",
@ -120,14 +120,14 @@ func TestEventHandlers_MoveToActiveOnNominatedNodeUpdate(t *testing.T) {
updatedPod.ResourceVersion = "1"
s.updatePodInSchedulingQueue(medNominatedPriorityPod, updatedPod)
},
wantInActive: sets.New(lowPriorityPod.Name, medPriorityPod.Name, medNominatedPriorityPod.Name),
wantInActiveOrBackoff: sets.New(lowPriorityPod.Name, medPriorityPod.Name, medNominatedPriorityPod.Name),
},
{
name: "Removal of a pod that had nominated node name should trigger rescheduling of lower priority pods",
updateFunc: func(s *Scheduler) {
s.deletePodFromSchedulingQueue(medNominatedPriorityPod)
},
wantInActive: sets.New(lowPriorityPod.Name, medPriorityPod.Name),
wantInActiveOrBackoff: sets.New(lowPriorityPod.Name, medPriorityPod.Name),
},
{
name: "Addition of a nominated node name to the high priority pod that did not have it before shouldn't trigger rescheduling",
@ -137,7 +137,7 @@ func TestEventHandlers_MoveToActiveOnNominatedNodeUpdate(t *testing.T) {
updatedPod.ResourceVersion = "1"
s.updatePodInSchedulingQueue(highPriorityPod, updatedPod)
},
wantInActive: sets.New[string](),
wantInActiveOrBackoff: sets.New[string](),
},
}
@ -190,12 +190,15 @@ func TestEventHandlers_MoveToActiveOnNominatedNodeUpdate(t *testing.T) {
t.Errorf("No pods were expected to be in the activeQ before the update, but there were %v", s.SchedulingQueue.PodsInActiveQ())
}
tt.updateFunc(s)
if len(s.SchedulingQueue.PodsInActiveQ()) != len(tt.wantInActive) {
t.Errorf("Different number of pods were expected to be in the activeQ, but found actual %v vs. expected %v", s.SchedulingQueue.PodsInActiveQ(), tt.wantInActive)
podsInActiveOrBackoff := s.SchedulingQueue.PodsInActiveQ()
podsInActiveOrBackoff = append(podsInActiveOrBackoff, s.SchedulingQueue.PodsInBackoffQ()...)
if len(podsInActiveOrBackoff) != len(tt.wantInActiveOrBackoff) {
t.Errorf("Different number of pods were expected to be in the activeQ or backoffQ, but found actual %v vs. expected %v", podsInActiveOrBackoff, tt.wantInActiveOrBackoff)
}
for _, pod := range s.SchedulingQueue.PodsInActiveQ() {
if !tt.wantInActive.Has(pod.Name) {
t.Errorf("Found unexpected pod in activeQ: %s", pod.Name)
for _, pod := range podsInActiveOrBackoff {
if !tt.wantInActiveOrBackoff.Has(pod.Name) {
t.Errorf("Found unexpected pod in activeQ or backoffQ: %s", pod.Name)
}
}
})

View File

@ -367,6 +367,9 @@ type QueuedPodInfo struct {
// It's used to record the # attempts metric and calculate the backoff time this Pod is obliged to get before retrying.
Attempts int
// BackoffExpiration is the time when the Pod will complete its backoff.
// If the SchedulerPopFromBackoffQ feature is enabled, the value is aligned to the backoff ordering window.
// Then, two Pods with the same BackoffExpiration (time bucket) are ordered by priority and eventually the timestamp,
// to make sure popping from the backoffQ considers priority of pods that are close to the expiration time.
BackoffExpiration time.Time
// The time when the pod is added to the queue for the first time. The pod may be added
// back to the queue multiple times before it's successfully scheduled.

View File

@ -118,7 +118,7 @@ func (sched *Scheduler) applyDefaultHandlers() {
}
type schedulerOptions struct {
clock clock.Clock
clock clock.WithTicker
componentConfigVersion string
kubeConfig *restclient.Config
// Overridden by profile level percentageOfNodesToScore if set in v1.
@ -231,7 +231,7 @@ func WithExtenders(e ...schedulerapi.Extender) Option {
}
// WithClock sets clock for PriorityQueue, the default clock is clock.RealClock.
func WithClock(clock clock.Clock) Option {
func WithClock(clock clock.WithTicker) Option {
return func(o *schedulerOptions) {
o.clock = clock
}

View File

@ -270,8 +270,8 @@ func TestUpdateNominatedNodeName(t *testing.T) {
// Note that the update has to happen since the nominated pod is still in the backoffQ to actually test updates of nominated, but not bound yet pods.
tt.updateFunc(testCtx)
// Advance time by the maxPodBackoffSeconds to move low priority pod out of the backoff queue.
fakeClock.Step(testBackoff)
// Advance time by the 2 * maxPodBackoffSeconds to move low priority pod out of the backoff queue.
fakeClock.Step(2 * testBackoff)
// Expect the low-priority pod is notified about unnominated mid-pririty pod and gets scheduled, as it should fit this time.
if err := testutils.WaitForPodToSchedule(testCtx.Ctx, testCtx.ClientSet, podLow); err != nil {

View File

@ -886,6 +886,9 @@ func TestAsyncPreemption(t *testing.T) {
}
logger, _ := ktesting.NewTestContext(t)
testCtx.Scheduler.SchedulingQueue.Run(logger)
defer testCtx.Scheduler.SchedulingQueue.Close()
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerAsyncPreemption, true)
createdPods := []*v1.Pod{}

View File

@ -41,6 +41,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
st "k8s.io/kubernetes/pkg/scheduler/testing"
testutils "k8s.io/kubernetes/test/integration/util"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
@ -2264,6 +2265,7 @@ var CoreResourceEnqueueTestCases = []*CoreResourceEnqueueTestCase{
func RunTestCoreResourceEnqueue(t *testing.T, tt *CoreResourceEnqueueTestCase) {
t.Helper()
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true)
logger, _ := ktesting.NewTestContext(t)
opts := []scheduler.Option{scheduler.WithPodInitialBackoffSeconds(0), scheduler.WithPodMaxBackoffSeconds(0)}
if tt.EnablePlugins != nil {
@ -2303,6 +2305,7 @@ func RunTestCoreResourceEnqueue(t *testing.T, tt *CoreResourceEnqueueTestCase) {
)
testutils.SyncSchedulerInformerFactory(testCtx)
testCtx.Scheduler.SchedulingQueue.Run(logger)
defer testCtx.Scheduler.SchedulingQueue.Close()
cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx