mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 18:00:08 +00:00
Merge pull request #130214 from macsko/split_backoffq_into_backoffq_and_errorbackoffq
Split backoffQ into backoffQ and errorBackoffQ in scheduler
This commit is contained in:
commit
65321bf5ee
238
pkg/scheduler/backend/queue/backoff_queue.go
Normal file
238
pkg/scheduler/backend/queue/backoff_queue.go
Normal file
@ -0,0 +1,238 @@
|
||||
/*
|
||||
Copyright 2025 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/scheduler/backend/heap"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
// backoffQueuer is a wrapper for backoffQ related operations.
|
||||
type backoffQueuer interface {
|
||||
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
|
||||
// If this returns true, the pod should not be re-tried.
|
||||
isPodBackingoff(podInfo *framework.QueuedPodInfo) bool
|
||||
// getBackoffTime returns the time that podInfo completes backoff
|
||||
getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time
|
||||
// popEachBackoffCompleted run fn for all pods from podBackoffQ and podErrorBackoffQ that completed backoff while popping them.
|
||||
popEachBackoffCompleted(logger klog.Logger, fn func(pInfo *framework.QueuedPodInfo))
|
||||
|
||||
// podInitialBackoffDuration returns initial backoff duration that pod can get.
|
||||
podInitialBackoffDuration() time.Duration
|
||||
// podMaxBackoffDuration returns maximum backoff duration that pod can get.
|
||||
podMaxBackoffDuration() time.Duration
|
||||
|
||||
// add adds the pInfo to backoffQueue.
|
||||
// It also ensures that pInfo is not in both queues.
|
||||
add(logger klog.Logger, pInfo *framework.QueuedPodInfo)
|
||||
// update updates the pod in backoffQueue if oldPodInfo is already in the queue.
|
||||
// It returns new pod info if updated, nil otherwise.
|
||||
update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo
|
||||
// delete deletes the pInfo from backoffQueue.
|
||||
delete(pInfo *framework.QueuedPodInfo)
|
||||
// get returns the pInfo matching given pInfoLookup, if exists.
|
||||
get(pInfoLookup *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool)
|
||||
// has inform if pInfo exists in the queue.
|
||||
has(pInfo *framework.QueuedPodInfo) bool
|
||||
// list returns all pods that are in the queue.
|
||||
list() []*framework.QueuedPodInfo
|
||||
// len returns length of the queue.
|
||||
len() int
|
||||
}
|
||||
|
||||
// backoffQueue implements backoffQueuer and wraps two queues inside,
|
||||
// providing seamless access as if it were one queue.
|
||||
type backoffQueue struct {
|
||||
clock clock.Clock
|
||||
|
||||
// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
|
||||
// are popped from this heap before the scheduler looks at activeQ
|
||||
podBackoffQ *heap.Heap[*framework.QueuedPodInfo]
|
||||
// podErrorBackoffQ is a heap ordered by error backoff expiry. Pods which have completed backoff
|
||||
// are popped from this heap before the scheduler looks at activeQ
|
||||
podErrorBackoffQ *heap.Heap[*framework.QueuedPodInfo]
|
||||
|
||||
podInitialBackoff time.Duration
|
||||
podMaxBackoff time.Duration
|
||||
}
|
||||
|
||||
func newBackoffQueue(clock clock.Clock, podInitialBackoffDuration time.Duration, podMaxBackoffDuration time.Duration) *backoffQueue {
|
||||
bq := &backoffQueue{
|
||||
clock: clock,
|
||||
podInitialBackoff: podInitialBackoffDuration,
|
||||
podMaxBackoff: podMaxBackoffDuration,
|
||||
}
|
||||
bq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, bq.lessBackoffCompleted, metrics.NewBackoffPodsRecorder())
|
||||
bq.podErrorBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, bq.lessBackoffCompleted, metrics.NewBackoffPodsRecorder())
|
||||
|
||||
return bq
|
||||
}
|
||||
|
||||
// podInitialBackoffDuration returns initial backoff duration that pod can get.
|
||||
func (bq *backoffQueue) podInitialBackoffDuration() time.Duration {
|
||||
return bq.podInitialBackoff
|
||||
}
|
||||
|
||||
// podMaxBackoffDuration returns maximum backoff duration that pod can get.
|
||||
func (bq *backoffQueue) podMaxBackoffDuration() time.Duration {
|
||||
return bq.podMaxBackoff
|
||||
}
|
||||
|
||||
// lessBackoffCompleted is a less function of podBackoffQ and podErrorBackoffQ.
|
||||
func (bq *backoffQueue) lessBackoffCompleted(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
|
||||
bo1 := bq.getBackoffTime(pInfo1)
|
||||
bo2 := bq.getBackoffTime(pInfo2)
|
||||
return bo1.Before(bo2)
|
||||
}
|
||||
|
||||
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
|
||||
// If this returns true, the pod should not be re-tried.
|
||||
func (bq *backoffQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool {
|
||||
boTime := bq.getBackoffTime(podInfo)
|
||||
return boTime.After(bq.clock.Now())
|
||||
}
|
||||
|
||||
// getBackoffTime returns the time that podInfo completes backoff
|
||||
func (bq *backoffQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time {
|
||||
duration := bq.calculateBackoffDuration(podInfo)
|
||||
backoffTime := podInfo.Timestamp.Add(duration)
|
||||
return backoffTime
|
||||
}
|
||||
|
||||
// calculateBackoffDuration is a helper function for calculating the backoffDuration
|
||||
// based on the number of attempts the pod has made.
|
||||
func (bq *backoffQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration {
|
||||
if podInfo.Attempts == 0 {
|
||||
// When the Pod hasn't experienced any scheduling attempts,
|
||||
// they aren't obliged to get a backoff penalty at all.
|
||||
return 0
|
||||
}
|
||||
|
||||
duration := bq.podInitialBackoff
|
||||
for i := 1; i < podInfo.Attempts; i++ {
|
||||
// Use subtraction instead of addition or multiplication to avoid overflow.
|
||||
if duration > bq.podMaxBackoff-duration {
|
||||
return bq.podMaxBackoff
|
||||
}
|
||||
duration += duration
|
||||
}
|
||||
return duration
|
||||
}
|
||||
|
||||
func (bq *backoffQueue) popEachBackoffCompletedWithQueue(logger klog.Logger, fn func(pInfo *framework.QueuedPodInfo), queue *heap.Heap[*framework.QueuedPodInfo]) {
|
||||
for {
|
||||
pInfo, ok := queue.Peek()
|
||||
if !ok || pInfo == nil {
|
||||
break
|
||||
}
|
||||
pod := pInfo.Pod
|
||||
if bq.isPodBackingoff(pInfo) {
|
||||
break
|
||||
}
|
||||
_, err := queue.Pop()
|
||||
if err != nil {
|
||||
logger.Error(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
|
||||
break
|
||||
}
|
||||
if fn != nil {
|
||||
fn(pInfo)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// popEachBackoffCompleted run fn for all pods from podBackoffQ and podErrorBackoffQ that completed backoff while popping them.
|
||||
func (bq *backoffQueue) popEachBackoffCompleted(logger klog.Logger, fn func(pInfo *framework.QueuedPodInfo)) {
|
||||
// Ensure both queues are called
|
||||
bq.popEachBackoffCompletedWithQueue(logger, fn, bq.podBackoffQ)
|
||||
bq.popEachBackoffCompletedWithQueue(logger, fn, bq.podErrorBackoffQ)
|
||||
}
|
||||
|
||||
// add adds the pInfo to backoffQueue.
|
||||
// It also ensures that pInfo is not in both queues.
|
||||
func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo) {
|
||||
// If pod has empty both unschedulable plugins and pending plugins,
|
||||
// it means that it failed because of error and should be moved to podErrorBackoffQ.
|
||||
if pInfo.UnschedulablePlugins.Len() == 0 && pInfo.PendingPlugins.Len() == 0 {
|
||||
bq.podErrorBackoffQ.AddOrUpdate(pInfo)
|
||||
// Ensure the pod is not in the podBackoffQ and report the error if it happens.
|
||||
err := bq.podBackoffQ.Delete(pInfo)
|
||||
if err == nil {
|
||||
logger.Error(nil, "BackoffQueue add() was called with a pod that was already in the podBackoffQ", "pod", klog.KObj(pInfo.Pod))
|
||||
}
|
||||
return
|
||||
}
|
||||
bq.podBackoffQ.AddOrUpdate(pInfo)
|
||||
// Ensure the pod is not in the podErrorBackoffQ and report the error if it happens.
|
||||
err := bq.podErrorBackoffQ.Delete(pInfo)
|
||||
if err == nil {
|
||||
logger.Error(nil, "BackoffQueue add() was called with a pod that was already in the podErrorBackoffQ", "pod", klog.KObj(pInfo.Pod))
|
||||
}
|
||||
}
|
||||
|
||||
// update updates the pod in backoffQueue if oldPodInfo is already in the queue.
|
||||
// It returns new pod info if updated, nil otherwise.
|
||||
func (bq *backoffQueue) update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo {
|
||||
// If the pod is in the backoff queue, update it there.
|
||||
if pInfo, exists := bq.podBackoffQ.Get(oldPodInfo); exists {
|
||||
_ = pInfo.Update(newPod)
|
||||
bq.podBackoffQ.AddOrUpdate(pInfo)
|
||||
return pInfo
|
||||
}
|
||||
// If the pod is in the error backoff queue, update it there.
|
||||
if pInfo, exists := bq.podErrorBackoffQ.Get(oldPodInfo); exists {
|
||||
_ = pInfo.Update(newPod)
|
||||
bq.podErrorBackoffQ.AddOrUpdate(pInfo)
|
||||
return pInfo
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// delete deletes the pInfo from backoffQueue.
|
||||
func (bq *backoffQueue) delete(pInfo *framework.QueuedPodInfo) {
|
||||
_ = bq.podBackoffQ.Delete(pInfo)
|
||||
_ = bq.podErrorBackoffQ.Delete(pInfo)
|
||||
}
|
||||
|
||||
// get returns the pInfo matching given pInfoLookup, if exists.
|
||||
func (bq *backoffQueue) get(pInfoLookup *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) {
|
||||
pInfo, exists := bq.podBackoffQ.Get(pInfoLookup)
|
||||
if exists {
|
||||
return pInfo, true
|
||||
}
|
||||
return bq.podErrorBackoffQ.Get(pInfoLookup)
|
||||
}
|
||||
|
||||
// has inform if pInfo exists in the queue.
|
||||
func (bq *backoffQueue) has(pInfo *framework.QueuedPodInfo) bool {
|
||||
return bq.podBackoffQ.Has(pInfo) || bq.podErrorBackoffQ.Has(pInfo)
|
||||
}
|
||||
|
||||
// list returns all pods that are in the queue.
|
||||
func (bq *backoffQueue) list() []*framework.QueuedPodInfo {
|
||||
return append(bq.podBackoffQ.List(), bq.podErrorBackoffQ.List()...)
|
||||
}
|
||||
|
||||
// len returns length of the queue.
|
||||
func (bq *backoffQueue) len() int {
|
||||
return bq.podBackoffQ.Len() + bq.podErrorBackoffQ.Len()
|
||||
}
|
170
pkg/scheduler/backend/queue/backoff_queue_test.go
Normal file
170
pkg/scheduler/backend/queue/backoff_queue_test.go
Normal file
@ -0,0 +1,170 @@
|
||||
/*
|
||||
Copyright 2025 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||
"k8s.io/utils/clock"
|
||||
testingclock "k8s.io/utils/clock/testing"
|
||||
)
|
||||
|
||||
func TestBackoffQueue_calculateBackoffDuration(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
initialBackoffDuration time.Duration
|
||||
maxBackoffDuration time.Duration
|
||||
podInfo *framework.QueuedPodInfo
|
||||
want time.Duration
|
||||
}{
|
||||
{
|
||||
name: "no backoff",
|
||||
initialBackoffDuration: 1 * time.Nanosecond,
|
||||
maxBackoffDuration: 32 * time.Nanosecond,
|
||||
podInfo: &framework.QueuedPodInfo{Attempts: 0},
|
||||
want: 0,
|
||||
},
|
||||
{
|
||||
name: "normal",
|
||||
initialBackoffDuration: 1 * time.Nanosecond,
|
||||
maxBackoffDuration: 32 * time.Nanosecond,
|
||||
podInfo: &framework.QueuedPodInfo{Attempts: 16},
|
||||
want: 32 * time.Nanosecond,
|
||||
},
|
||||
{
|
||||
name: "overflow_32bit",
|
||||
initialBackoffDuration: 1 * time.Nanosecond,
|
||||
maxBackoffDuration: math.MaxInt32 * time.Nanosecond,
|
||||
podInfo: &framework.QueuedPodInfo{Attempts: 32},
|
||||
want: math.MaxInt32 * time.Nanosecond,
|
||||
},
|
||||
{
|
||||
name: "overflow_64bit",
|
||||
initialBackoffDuration: 1 * time.Nanosecond,
|
||||
maxBackoffDuration: math.MaxInt64 * time.Nanosecond,
|
||||
podInfo: &framework.QueuedPodInfo{Attempts: 64},
|
||||
want: math.MaxInt64 * time.Nanosecond,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
bq := newBackoffQueue(clock.RealClock{}, tt.initialBackoffDuration, tt.maxBackoffDuration)
|
||||
if got := bq.calculateBackoffDuration(tt.podInfo); got != tt.want {
|
||||
t.Errorf("backoffQueue.calculateBackoffDuration() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) {
|
||||
fakeClock := testingclock.NewFakeClock(time.Now())
|
||||
podInfos := map[string]*framework.QueuedPodInfo{
|
||||
"pod0": {
|
||||
PodInfo: &framework.PodInfo{
|
||||
Pod: st.MakePod().Name("pod0").Obj(),
|
||||
},
|
||||
Timestamp: fakeClock.Now().Add(-time.Second),
|
||||
Attempts: 1,
|
||||
UnschedulablePlugins: sets.New("plugin"),
|
||||
},
|
||||
"pod1": {
|
||||
PodInfo: &framework.PodInfo{
|
||||
Pod: st.MakePod().Name("pod1").Obj(),
|
||||
},
|
||||
Timestamp: fakeClock.Now().Add(time.Second),
|
||||
Attempts: 1,
|
||||
UnschedulablePlugins: sets.New("plugin"),
|
||||
},
|
||||
"pod2": {
|
||||
PodInfo: &framework.PodInfo{
|
||||
Pod: st.MakePod().Name("pod2").Obj(),
|
||||
},
|
||||
Timestamp: fakeClock.Now().Add(-time.Second),
|
||||
Attempts: 1,
|
||||
},
|
||||
"pod3": {
|
||||
PodInfo: &framework.PodInfo{
|
||||
Pod: st.MakePod().Name("pod3").Obj(),
|
||||
},
|
||||
Timestamp: fakeClock.Now().Add(time.Second),
|
||||
Attempts: 1,
|
||||
},
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
podsInBackoff []string
|
||||
wantPods []string
|
||||
}{
|
||||
{
|
||||
name: "Both queues empty, no pods moved to activeQ",
|
||||
podsInBackoff: []string{},
|
||||
wantPods: nil,
|
||||
},
|
||||
{
|
||||
name: "Pods only in backoffQ, some pods moved to activeQ",
|
||||
podsInBackoff: []string{"pod0", "pod1"},
|
||||
wantPods: []string{"pod0"},
|
||||
},
|
||||
{
|
||||
name: "Pods only in errorBackoffQ, some pods moved to activeQ",
|
||||
podsInBackoff: []string{"pod2", "pod3"},
|
||||
wantPods: []string{"pod2"},
|
||||
},
|
||||
{
|
||||
name: "Pods in both queues, some pods moved to activeQ",
|
||||
podsInBackoff: []string{"pod0", "pod1", "pod2", "pod3"},
|
||||
wantPods: []string{"pod0", "pod2"},
|
||||
},
|
||||
{
|
||||
name: "Pods in both queues, all pods moved to activeQ",
|
||||
podsInBackoff: []string{"pod0", "pod2"},
|
||||
wantPods: []string{"pod0", "pod2"},
|
||||
},
|
||||
{
|
||||
name: "Pods in both queues, no pods moved to activeQ",
|
||||
podsInBackoff: []string{"pod1", "pod3"},
|
||||
wantPods: nil,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
bq := newBackoffQueue(fakeClock, DefaultPodInitialBackoffDuration, DefaultPodMaxBackoffDuration)
|
||||
for _, podName := range tt.podsInBackoff {
|
||||
bq.add(logger, podInfos[podName])
|
||||
}
|
||||
var gotPods []string
|
||||
bq.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) {
|
||||
gotPods = append(gotPods, pInfo.Pod.Name)
|
||||
})
|
||||
if diff := cmp.Diff(tt.wantPods, gotPods); diff != "" {
|
||||
t.Errorf("Unexpected pods moved (-want, +got):\n%s", diff)
|
||||
}
|
||||
podsToStayInBackoff := len(tt.podsInBackoff) - len(tt.wantPods)
|
||||
if bq.len() != podsToStayInBackoff {
|
||||
t.Errorf("Expected %v pods to stay in backoffQ, but got: %v", podsToStayInBackoff, bq.len())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -162,17 +162,11 @@ type PriorityQueue struct {
|
||||
// Correct locking order is: lock > activeQueue.lock > nominator.nLock.
|
||||
lock sync.RWMutex
|
||||
|
||||
// pod initial backoff duration.
|
||||
podInitialBackoffDuration time.Duration
|
||||
// pod maximum backoff duration.
|
||||
podMaxBackoffDuration time.Duration
|
||||
// the maximum time a pod can stay in the unschedulablePods.
|
||||
podMaxInUnschedulablePodsDuration time.Duration
|
||||
|
||||
activeQ activeQueuer
|
||||
// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
|
||||
// are popped from this heap before the scheduler looks at activeQ
|
||||
podBackoffQ *heap.Heap[*framework.QueuedPodInfo]
|
||||
activeQ activeQueuer
|
||||
backoffQ backoffQueuer
|
||||
// unschedulablePods holds pods that have been tried and determined unschedulable.
|
||||
unschedulablePods *UnschedulablePods
|
||||
// moveRequestCycle caches the sequence number of scheduling cycle when we
|
||||
@ -335,10 +329,9 @@ func NewPriorityQueue(
|
||||
pq := &PriorityQueue{
|
||||
clock: options.clock,
|
||||
stop: make(chan struct{}),
|
||||
podInitialBackoffDuration: options.podInitialBackoffDuration,
|
||||
podMaxBackoffDuration: options.podMaxBackoffDuration,
|
||||
podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
|
||||
activeQ: newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled, options.metricsRecorder),
|
||||
backoffQ: newBackoffQueue(options.clock, options.podInitialBackoffDuration, options.podMaxBackoffDuration),
|
||||
unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()),
|
||||
preEnqueuePluginMap: options.preEnqueuePluginMap,
|
||||
queueingHintMap: options.queueingHintMap,
|
||||
@ -347,14 +340,13 @@ func NewPriorityQueue(
|
||||
moveRequestCycle: -1,
|
||||
isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled,
|
||||
}
|
||||
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
|
||||
pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()
|
||||
pq.nominator = newPodNominator(options.podLister)
|
||||
|
||||
return pq
|
||||
}
|
||||
|
||||
// Run starts the goroutine to pump from podBackoffQ to activeQ
|
||||
// Run starts the goroutine to pump from backoffQ to activeQ
|
||||
func (p *PriorityQueue) Run(logger klog.Logger) {
|
||||
go wait.Until(func() {
|
||||
p.flushBackoffQCompleted(logger)
|
||||
@ -568,7 +560,7 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue
|
||||
if unlockedActiveQ.Has(pInfo) {
|
||||
return
|
||||
}
|
||||
if p.podBackoffQ.Has(pInfo) {
|
||||
if p.backoffQ.has(pInfo) {
|
||||
return
|
||||
}
|
||||
p.unschedulablePods.addOrUpdate(pInfo)
|
||||
@ -583,7 +575,7 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue
|
||||
added = true
|
||||
|
||||
p.unschedulablePods.delete(pInfo.Pod, gatedBefore)
|
||||
_ = p.podBackoffQ.Delete(pInfo) // Don't need to react when pInfo is not found.
|
||||
p.backoffQ.delete(pInfo)
|
||||
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ)
|
||||
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
|
||||
if event == framework.EventUnscheduledPodAdd.Label() || event == framework.EventUnscheduledPodUpdate.Label() {
|
||||
@ -641,7 +633,7 @@ func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool {
|
||||
// If the pod doesn't belong to unschedulablePods or backoffQ, don't activate it.
|
||||
// The pod can be already in activeQ.
|
||||
var exists bool
|
||||
pInfo, exists = p.podBackoffQ.Get(newQueuedPodInfoForLookup(pod))
|
||||
pInfo, exists = p.backoffQ.get(newQueuedPodInfoForLookup(pod))
|
||||
if !exists {
|
||||
return false
|
||||
}
|
||||
@ -656,13 +648,6 @@ func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool {
|
||||
return p.moveToActiveQ(logger, pInfo, framework.ForceActivate)
|
||||
}
|
||||
|
||||
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
|
||||
// If this returns true, the pod should not be re-tried.
|
||||
func (p *PriorityQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool {
|
||||
boTime := p.getBackoffTime(podInfo)
|
||||
return boTime.After(p.clock.Now())
|
||||
}
|
||||
|
||||
// SchedulingCycle returns current scheduling cycle.
|
||||
func (p *PriorityQueue) SchedulingCycle() int64 {
|
||||
return p.activeQ.schedulingCycle()
|
||||
@ -712,7 +697,7 @@ func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger
|
||||
// addUnschedulableIfNotPresentWithoutQueueingHint inserts a pod that cannot be scheduled into
|
||||
// the queue, unless it is already in the queue. Normally, PriorityQueue puts
|
||||
// unschedulable pods in `unschedulablePods`. But if there has been a recent move
|
||||
// request, then the pod is put in `podBackoffQ`.
|
||||
// request, then the pod is put in `backoffQ`.
|
||||
// TODO: This function is called only when p.isSchedulingQueueHintEnabled is false,
|
||||
// and this will be removed after SchedulingQueueHint goes to stable and the feature gate is removed.
|
||||
func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger, pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error {
|
||||
@ -736,7 +721,7 @@ func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger,
|
||||
// - No unschedulable plugins are associated with this Pod,
|
||||
// meaning something unusual (a temporal failure on kube-apiserver, etc) happened and this Pod gets moved back to the queue.
|
||||
// In this case, we should retry scheduling it because this Pod may not be retried until the next flush.
|
||||
p.podBackoffQ.AddOrUpdate(pInfo)
|
||||
p.backoffQ.add(logger, pInfo)
|
||||
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", backoffQ)
|
||||
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", framework.ScheduleAttemptFailure).Inc()
|
||||
} else {
|
||||
@ -751,7 +736,7 @@ func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger,
|
||||
// AddUnschedulableIfNotPresent inserts a pod that cannot be scheduled into
|
||||
// the queue, unless it is already in the queue. Normally, PriorityQueue puts
|
||||
// unschedulable pods in `unschedulablePods`. But if there has been a recent move
|
||||
// request, then the pod is put in `podBackoffQ`.
|
||||
// request, then the pod is put in `backoffQ`.
|
||||
func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
@ -767,7 +752,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *
|
||||
if p.activeQ.has(pInfo) {
|
||||
return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod))
|
||||
}
|
||||
if p.podBackoffQ.Has(pInfo) {
|
||||
if p.backoffQ.has(pInfo) {
|
||||
return fmt.Errorf("Pod %v is already present in the backoff queue", klog.KObj(pod))
|
||||
}
|
||||
|
||||
@ -805,25 +790,11 @@ func (p *PriorityQueue) flushBackoffQCompleted(logger klog.Logger) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
activated := false
|
||||
for {
|
||||
pInfo, ok := p.podBackoffQ.Peek()
|
||||
if !ok || pInfo == nil {
|
||||
break
|
||||
}
|
||||
pod := pInfo.Pod
|
||||
if p.isPodBackingoff(pInfo) {
|
||||
break
|
||||
}
|
||||
_, err := p.podBackoffQ.Pop()
|
||||
if err != nil {
|
||||
logger.Error(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
|
||||
break
|
||||
}
|
||||
p.backoffQ.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) {
|
||||
if added := p.moveToActiveQ(logger, pInfo, framework.BackoffComplete); added {
|
||||
activated = true
|
||||
}
|
||||
}
|
||||
|
||||
})
|
||||
if activated {
|
||||
p.activeQ.broadcast()
|
||||
}
|
||||
@ -928,10 +899,8 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
|
||||
}
|
||||
|
||||
// If the pod is in the backoff queue, update it there.
|
||||
if pInfo, exists := p.podBackoffQ.Get(oldPodInfo); exists {
|
||||
_ = pInfo.Update(newPod)
|
||||
if pInfo := p.backoffQ.update(newPod, oldPodInfo); pInfo != nil {
|
||||
p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo)
|
||||
p.podBackoffQ.AddOrUpdate(pInfo)
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -961,8 +930,10 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
|
||||
return
|
||||
}
|
||||
if isPodUpdated(oldPod, newPod) {
|
||||
if p.isPodBackingoff(pInfo) {
|
||||
p.podBackoffQ.AddOrUpdate(pInfo)
|
||||
// Pod might have completed its backoff time while being in unschedulablePods,
|
||||
// so we should check isPodBackingoff before moving the pod to backoffQ.
|
||||
if p.backoffQ.isPodBackingoff(pInfo) {
|
||||
p.backoffQ.add(logger, pInfo)
|
||||
p.unschedulablePods.delete(pInfo.Pod, gated)
|
||||
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", framework.EventUnscheduledPodUpdate.Label(), "queue", backoffQ)
|
||||
return
|
||||
@ -994,7 +965,7 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) {
|
||||
pInfo := newQueuedPodInfoForLookup(pod)
|
||||
if err := p.activeQ.delete(pInfo); err != nil {
|
||||
// The item was probably not found in the activeQ.
|
||||
p.podBackoffQ.Delete(pInfo)
|
||||
p.backoffQ.delete(pInfo)
|
||||
if pInfo = p.unschedulablePods.get(pod); pInfo != nil {
|
||||
p.unschedulablePods.delete(pod, pInfo.Gated)
|
||||
}
|
||||
@ -1070,8 +1041,10 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra
|
||||
return unschedulablePods
|
||||
}
|
||||
|
||||
if strategy == queueAfterBackoff && p.isPodBackingoff(pInfo) {
|
||||
p.podBackoffQ.AddOrUpdate(pInfo)
|
||||
// Pod might have completed its backoff time while being in unschedulablePods,
|
||||
// so we should check isPodBackingoff before moving the pod to backoffQ.
|
||||
if strategy == queueAfterBackoff && p.backoffQ.isPodBackingoff(pInfo) {
|
||||
p.backoffQ.add(logger, pInfo)
|
||||
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
|
||||
return backoffQ
|
||||
}
|
||||
@ -1197,7 +1170,7 @@ func (p *PriorityQueue) GetPod(name, namespace string) (pInfo *framework.QueuedP
|
||||
},
|
||||
},
|
||||
}
|
||||
if pInfo, ok = p.podBackoffQ.Get(pInfoLookup); ok {
|
||||
if pInfo, ok = p.backoffQ.get(pInfoLookup); ok {
|
||||
return pInfo, true
|
||||
}
|
||||
if pInfo = p.unschedulablePods.get(pInfoLookup.Pod); pInfo != nil {
|
||||
@ -1218,13 +1191,13 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) {
|
||||
defer p.lock.RUnlock()
|
||||
result := p.activeQ.list()
|
||||
activeQLen := len(result)
|
||||
for _, pInfo := range p.podBackoffQ.List() {
|
||||
for _, pInfo := range p.backoffQ.list() {
|
||||
result = append(result, pInfo.Pod)
|
||||
}
|
||||
for _, pInfo := range p.unschedulablePods.podInfoMap {
|
||||
result = append(result, pInfo.Pod)
|
||||
}
|
||||
return result, fmt.Sprintf(pendingPodsSummary, activeQLen, p.podBackoffQ.Len(), len(p.unschedulablePods.podInfoMap))
|
||||
return result, fmt.Sprintf(pendingPodsSummary, activeQLen, p.backoffQ.len(), len(p.unschedulablePods.podInfoMap))
|
||||
}
|
||||
|
||||
// Note: this function assumes the caller locks both p.lock.RLock and p.activeQ.getLock().RLock.
|
||||
@ -1242,7 +1215,7 @@ func (p *PriorityQueue) nominatedPodToInfo(np podRef, unlockedActiveQ unlockedAc
|
||||
return queuedPodInfo.PodInfo
|
||||
}
|
||||
|
||||
queuedPodInfo, exists = p.podBackoffQ.Get(pInfoLookup)
|
||||
queuedPodInfo, exists = p.backoffQ.get(pInfoLookup)
|
||||
if exists {
|
||||
return queuedPodInfo.PodInfo
|
||||
}
|
||||
@ -1276,12 +1249,6 @@ func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*framework.PodIn
|
||||
return pods
|
||||
}
|
||||
|
||||
func (p *PriorityQueue) podsCompareBackoffCompleted(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
|
||||
bo1 := p.getBackoffTime(pInfo1)
|
||||
bo2 := p.getBackoffTime(pInfo2)
|
||||
return bo1.Before(bo2)
|
||||
}
|
||||
|
||||
// newQueuedPodInfo builds a QueuedPodInfo object.
|
||||
func (p *PriorityQueue) newQueuedPodInfo(pod *v1.Pod, plugins ...string) *framework.QueuedPodInfo {
|
||||
now := p.clock.Now()
|
||||
@ -1296,33 +1263,6 @@ func (p *PriorityQueue) newQueuedPodInfo(pod *v1.Pod, plugins ...string) *framew
|
||||
}
|
||||
}
|
||||
|
||||
// getBackoffTime returns the time that podInfo completes backoff
|
||||
func (p *PriorityQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time {
|
||||
duration := p.calculateBackoffDuration(podInfo)
|
||||
backoffTime := podInfo.Timestamp.Add(duration)
|
||||
return backoffTime
|
||||
}
|
||||
|
||||
// calculateBackoffDuration is a helper function for calculating the backoffDuration
|
||||
// based on the number of attempts the pod has made.
|
||||
func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration {
|
||||
if podInfo.Attempts == 0 {
|
||||
// When the Pod hasn't experienced any scheduling attempts,
|
||||
// they aren't obliged to get a backoff penalty at all.
|
||||
return 0
|
||||
}
|
||||
|
||||
duration := p.podInitialBackoffDuration
|
||||
for i := 1; i < podInfo.Attempts; i++ {
|
||||
// Use subtraction instead of addition or multiplication to avoid overflow.
|
||||
if duration > p.podMaxBackoffDuration-duration {
|
||||
return p.podMaxBackoffDuration
|
||||
}
|
||||
duration += duration
|
||||
}
|
||||
return duration
|
||||
}
|
||||
|
||||
// UnschedulablePods holds pods that cannot be scheduled. This data structure
|
||||
// is used to implement unschedulablePods.
|
||||
type UnschedulablePods struct {
|
||||
|
@ -19,7 +19,6 @@ package queue
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
@ -836,7 +835,7 @@ func Test_InFlightPods(t *testing.T) {
|
||||
}
|
||||
|
||||
if test.wantBackoffQPodNames != nil {
|
||||
podInfos := q.podBackoffQ.List()
|
||||
podInfos := q.backoffQ.list()
|
||||
var podNames []string
|
||||
for _, pInfo := range podInfos {
|
||||
podNames = append(podNames, pInfo.Pod.Name)
|
||||
@ -1006,8 +1005,8 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
|
||||
// Since there was a move request at the same cycle as "oldCycle", these pods
|
||||
// should be in the backoff queue.
|
||||
for i := 1; i < totalNum; i++ {
|
||||
if !q.podBackoffQ.Has(newQueuedPodInfoForLookup(&expectedPods[i])) {
|
||||
t.Errorf("Expected %v to be added to podBackoffQ.", expectedPods[i].Name)
|
||||
if !q.backoffQ.has(newQueuedPodInfoForLookup(&expectedPods[i])) {
|
||||
t.Errorf("Expected %v to be added to backoffQ.", expectedPods[i].Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1099,7 +1098,7 @@ func TestPriorityQueue_Update(t *testing.T) {
|
||||
wantQ: backoffQ,
|
||||
prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) {
|
||||
podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod)
|
||||
q.podBackoffQ.AddOrUpdate(podInfo)
|
||||
q.backoffQ.add(logger, podInfo)
|
||||
return podInfo.Pod, podInfo.Pod
|
||||
},
|
||||
schedulingHintsEnablement: []bool{false, true},
|
||||
@ -1124,7 +1123,7 @@ func TestPriorityQueue_Update(t *testing.T) {
|
||||
updatedPod.Annotations["foo"] = "test1"
|
||||
// Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off,
|
||||
// and the pods will be moved into activeQ.
|
||||
c.Step(q.podInitialBackoffDuration)
|
||||
c.Step(q.backoffQ.podInitialBackoffDuration())
|
||||
return medPriorityPodInfo.Pod, updatedPod
|
||||
},
|
||||
schedulingHintsEnablement: []bool{false, true},
|
||||
@ -1174,7 +1173,7 @@ func TestPriorityQueue_Update(t *testing.T) {
|
||||
var pInfo *framework.QueuedPodInfo
|
||||
|
||||
// validate expected queue
|
||||
if pInfoFromBackoff, exists := q.podBackoffQ.Get(newQueuedPodInfoForLookup(newPod)); exists {
|
||||
if pInfoFromBackoff, exists := q.backoffQ.get(newQueuedPodInfoForLookup(newPod)); exists {
|
||||
if tt.wantQ != backoffQ {
|
||||
t.Errorf("expected pod %s not to be queued to backoffQ, but it was", newPod.Name)
|
||||
}
|
||||
@ -1260,7 +1259,7 @@ func TestPriorityQueue_UpdateWhenInflight(t *testing.T) {
|
||||
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
|
||||
}
|
||||
|
||||
pInfo, exists := q.podBackoffQ.Get(newQueuedPodInfoForLookup(updatedPod))
|
||||
pInfo, exists := q.backoffQ.get(newQueuedPodInfoForLookup(updatedPod))
|
||||
if !exists {
|
||||
t.Fatalf("expected pod %s to be queued to backoffQ, but it wasn't.", updatedPod.Name)
|
||||
}
|
||||
@ -1298,7 +1297,7 @@ func TestPriorityQueue_Activate(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
qPodInfoInUnschedulablePods []*framework.QueuedPodInfo
|
||||
qPodInfoInPodBackoffQ []*framework.QueuedPodInfo
|
||||
qPodInfoInBackoffQ []*framework.QueuedPodInfo
|
||||
qPodInActiveQ []*v1.Pod
|
||||
qPodInfoToActivate *framework.QueuedPodInfo
|
||||
qPodInInFlightPod *v1.Pod
|
||||
@ -1313,12 +1312,12 @@ func TestPriorityQueue_Activate(t *testing.T) {
|
||||
want: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}}, // 1 already active
|
||||
},
|
||||
{
|
||||
name: "pod not in unschedulablePods/podBackoffQ",
|
||||
name: "pod not in unschedulablePods/backoffQ",
|
||||
qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo},
|
||||
want: []*framework.QueuedPodInfo{},
|
||||
},
|
||||
{
|
||||
name: "[QHint] pod not in unschedulablePods/podBackoffQ but in-flight",
|
||||
name: "[QHint] pod not in unschedulablePods/backoffQ but in-flight",
|
||||
qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo},
|
||||
qPodInInFlightPod: highPriNominatedPodInfo.Pod,
|
||||
expectedInFlightEvent: &clusterEvent{oldObj: (*v1.Pod)(nil), newObj: highPriNominatedPodInfo.Pod, event: framework.EventForceActivate},
|
||||
@ -1326,7 +1325,7 @@ func TestPriorityQueue_Activate(t *testing.T) {
|
||||
qHintEnabled: true,
|
||||
},
|
||||
{
|
||||
name: "[QHint] pod not in unschedulablePods/podBackoffQ and not in-flight",
|
||||
name: "[QHint] pod not in unschedulablePods/backoffQ and not in-flight",
|
||||
qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo},
|
||||
qPodInInFlightPod: medPriorityPodInfo.Pod, // different pod is in-flight
|
||||
want: []*framework.QueuedPodInfo{},
|
||||
@ -1339,10 +1338,10 @@ func TestPriorityQueue_Activate(t *testing.T) {
|
||||
want: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}},
|
||||
},
|
||||
{
|
||||
name: "pod in backoffQ",
|
||||
qPodInfoInPodBackoffQ: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}},
|
||||
qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo},
|
||||
want: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}},
|
||||
name: "pod in backoffQ",
|
||||
qPodInfoInBackoffQ: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}},
|
||||
qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo},
|
||||
want: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}},
|
||||
},
|
||||
}
|
||||
|
||||
@ -1372,7 +1371,7 @@ func TestPriorityQueue_Activate(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Prepare activeQ/unschedulablePods/podBackoffQ according to the table
|
||||
// Prepare activeQ/unschedulablePods/backoffQ according to the table
|
||||
for _, qPod := range tt.qPodInActiveQ {
|
||||
q.Add(logger, qPod)
|
||||
}
|
||||
@ -1381,8 +1380,8 @@ func TestPriorityQueue_Activate(t *testing.T) {
|
||||
q.unschedulablePods.addOrUpdate(qPodInfo)
|
||||
}
|
||||
|
||||
for _, qPodInfo := range tt.qPodInfoInPodBackoffQ {
|
||||
q.podBackoffQ.AddOrUpdate(qPodInfo)
|
||||
for _, qPodInfo := range tt.qPodInfoInBackoffQ {
|
||||
q.backoffQ.add(logger, qPodInfo)
|
||||
}
|
||||
|
||||
// Activate specific pod according to the table
|
||||
@ -1722,7 +1721,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing.
|
||||
|
||||
q.MoveAllToActiveOrBackoffQueue(logger, nodeAdd, nil, nil, nil)
|
||||
|
||||
if q.podBackoffQ.Len() == 0 && test.expectedQ == backoffQ {
|
||||
if q.backoffQ.len() == 0 && test.expectedQ == backoffQ {
|
||||
t.Fatalf("expected pod to be queued to backoffQ, but it was not")
|
||||
}
|
||||
|
||||
@ -1811,15 +1810,14 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
|
||||
}
|
||||
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID)
|
||||
// hpp2 won't be moved.
|
||||
if q.podBackoffQ.Len() != 3 {
|
||||
t.Fatalf("Expected 3 items to be in podBackoffQ, but got: %v", q.podBackoffQ.Len())
|
||||
if q.backoffQ.len() != 3 {
|
||||
t.Fatalf("Expected 3 items to be in backoffQ, but got: %v", q.backoffQ.len())
|
||||
}
|
||||
|
||||
// pop out the pods in the backoffQ.
|
||||
// This doesn't make them in-flight pods.
|
||||
for q.podBackoffQ.Len() != 0 {
|
||||
q.podBackoffQ.Pop()
|
||||
}
|
||||
c.Step(q.backoffQ.podMaxBackoffDuration())
|
||||
q.backoffQ.popEachBackoffCompleted(logger, nil)
|
||||
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID)
|
||||
|
||||
q.Add(logger, unschedulablePodInfo.Pod)
|
||||
@ -1863,20 +1861,20 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
|
||||
t.Errorf("Expected %v in the unschedulablePods", pod.Name)
|
||||
}
|
||||
}
|
||||
if !q.podBackoffQ.Has(hpp1QueuedPodInfo) {
|
||||
t.Errorf("Expected %v in the podBackoffQ", hpp1.Name)
|
||||
if !q.backoffQ.has(hpp1QueuedPodInfo) {
|
||||
t.Errorf("Expected %v in the backoffQ", hpp1.Name)
|
||||
}
|
||||
|
||||
// Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off,
|
||||
// and the pods will be moved into activeQ.
|
||||
c.Step(q.podInitialBackoffDuration)
|
||||
c.Step(q.backoffQ.podInitialBackoffDuration())
|
||||
q.flushBackoffQCompleted(logger) // flush the completed backoffQ to move hpp1 to activeQ.
|
||||
q.MoveAllToActiveOrBackoffQueue(logger, nodeAdd, nil, nil, nil)
|
||||
if q.activeQ.len() != 4 {
|
||||
t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.len())
|
||||
}
|
||||
if q.podBackoffQ.Len() != 0 {
|
||||
t.Errorf("Expected 0 item to be in podBackoffQ, but got: %v", q.podBackoffQ.Len())
|
||||
if q.backoffQ.len() != 0 {
|
||||
t.Errorf("Expected 0 item to be in backoffQ, but got: %v", q.backoffQ.len())
|
||||
}
|
||||
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID)
|
||||
if len(q.unschedulablePods.podInfoMap) != 1 {
|
||||
@ -1935,18 +1933,14 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi
|
||||
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod, p.Pod.Name)
|
||||
}
|
||||
// hpp2 won't be moved.
|
||||
if q.podBackoffQ.Len() != 3 {
|
||||
t.Fatalf("Expected 3 items to be in podBackoffQ, but got: %v", q.podBackoffQ.Len())
|
||||
if q.backoffQ.len() != 3 {
|
||||
t.Fatalf("Expected 3 items to be in backoffQ, but got: %v", q.backoffQ.len())
|
||||
}
|
||||
|
||||
// pop out the pods in the backoffQ.
|
||||
// This doesn't make them in-flight pods.
|
||||
for q.podBackoffQ.Len() != 0 {
|
||||
_, err = q.podBackoffQ.Pop()
|
||||
if err != nil {
|
||||
t.Errorf("pop failed: %v", err)
|
||||
}
|
||||
}
|
||||
c.Step(q.backoffQ.podMaxBackoffDuration())
|
||||
q.backoffQ.popEachBackoffCompleted(logger, nil)
|
||||
|
||||
unschedulableQueuedPodInfo := attemptQueuedPodInfo(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"))
|
||||
highPriorityQueuedPodInfo := attemptQueuedPodInfo(q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"))
|
||||
@ -1971,18 +1965,20 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi
|
||||
t.Errorf("Expected %v in the unschedulablePods", pod.Name)
|
||||
}
|
||||
}
|
||||
q.podBackoffQ.Get(hpp1QueuedPodInfo)
|
||||
if !q.backoffQ.has(hpp1QueuedPodInfo) {
|
||||
t.Errorf("Expected %v in the backoffQ", hpp1.Name)
|
||||
}
|
||||
|
||||
// Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off,
|
||||
// and the pods will be moved into activeQ.
|
||||
c.Step(q.podInitialBackoffDuration)
|
||||
c.Step(q.backoffQ.podInitialBackoffDuration())
|
||||
q.flushBackoffQCompleted(logger) // flush the completed backoffQ to move hpp1 to activeQ.
|
||||
q.MoveAllToActiveOrBackoffQueue(logger, nodeAdd, nil, nil, nil)
|
||||
if q.activeQ.len() != 4 {
|
||||
t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.len())
|
||||
}
|
||||
if q.podBackoffQ.Len() != 0 {
|
||||
t.Errorf("Expected 0 item to be in podBackoffQ, but got: %v", q.podBackoffQ.Len())
|
||||
if q.backoffQ.len() != 0 {
|
||||
t.Errorf("Expected 0 item to be in backoffQ, but got: %v", q.backoffQ.len())
|
||||
}
|
||||
if len(q.unschedulablePods.podInfoMap) != 1 {
|
||||
// hpp2 won't be moved regardless of its backoff timer.
|
||||
@ -2456,12 +2452,12 @@ func TestPriorityQueue_NewWithOptions(t *testing.T) {
|
||||
WithPodMaxBackoffDuration(20*time.Second),
|
||||
)
|
||||
|
||||
if q.podInitialBackoffDuration != 2*time.Second {
|
||||
t.Errorf("Unexpected pod backoff initial duration. Expected: %v, got: %v", 2*time.Second, q.podInitialBackoffDuration)
|
||||
if q.backoffQ.podInitialBackoffDuration() != 2*time.Second {
|
||||
t.Errorf("Unexpected pod backoff initial duration. Expected: %v, got: %v", 2*time.Second, q.backoffQ.podInitialBackoffDuration())
|
||||
}
|
||||
|
||||
if q.podMaxBackoffDuration != 20*time.Second {
|
||||
t.Errorf("Unexpected pod backoff max duration. Expected: %v, got: %v", 2*time.Second, q.podMaxBackoffDuration)
|
||||
if q.backoffQ.podMaxBackoffDuration() != 20*time.Second {
|
||||
t.Errorf("Unexpected pod backoff max duration. Expected: %v, got: %v", 2*time.Second, q.backoffQ.podMaxBackoffDuration())
|
||||
}
|
||||
}
|
||||
|
||||
@ -2993,7 +2989,7 @@ var (
|
||||
queue.Update(logger, pInfo.Pod, newPod)
|
||||
}
|
||||
addPodBackoffQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
|
||||
queue.podBackoffQ.AddOrUpdate(pInfo)
|
||||
queue.backoffQ.add(logger, pInfo)
|
||||
}
|
||||
moveAllToActiveOrBackoffQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) {
|
||||
queue.MoveAllToActiveOrBackoffQueue(logger, framework.EventUnschedulableTimeout, nil, nil, nil)
|
||||
@ -3639,12 +3635,12 @@ func TestBackOffFlow(t *testing.T) {
|
||||
// An event happens.
|
||||
q.MoveAllToActiveOrBackoffQueue(logger, framework.EventUnschedulableTimeout, nil, nil, nil)
|
||||
|
||||
if !q.podBackoffQ.Has(podInfo) {
|
||||
if !q.backoffQ.has(podInfo) {
|
||||
t.Errorf("pod %v is not in the backoff queue", podID)
|
||||
}
|
||||
|
||||
// Check backoff duration.
|
||||
deadline := q.getBackoffTime(podInfo)
|
||||
deadline := q.backoffQ.getBackoffTime(podInfo)
|
||||
backoff := deadline.Sub(timestamp)
|
||||
if backoff != step.wantBackoff {
|
||||
t.Errorf("got backoff %s, want %s", backoff, step.wantBackoff)
|
||||
@ -3654,13 +3650,13 @@ func TestBackOffFlow(t *testing.T) {
|
||||
cl.Step(time.Millisecond)
|
||||
q.flushBackoffQCompleted(logger)
|
||||
// Still in backoff queue after an early flush.
|
||||
if !q.podBackoffQ.Has(podInfo) {
|
||||
if !q.backoffQ.has(podInfo) {
|
||||
t.Errorf("pod %v is not in the backoff queue", podID)
|
||||
}
|
||||
// Moved out of the backoff queue after timeout.
|
||||
cl.Step(backoff)
|
||||
q.flushBackoffQCompleted(logger)
|
||||
if q.podBackoffQ.Has(podInfo) {
|
||||
if q.backoffQ.has(podInfo) {
|
||||
t.Errorf("pod %v is still in the backoff queue", podID)
|
||||
}
|
||||
})
|
||||
@ -3725,10 +3721,11 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
c := testingclock.NewFakeClock(time.Now())
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
q := NewTestQueue(ctx, newDefaultQueueSort())
|
||||
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c))
|
||||
for i, podInfo := range tt.podInfos {
|
||||
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below.
|
||||
q.Add(logger, podInfo.Pod)
|
||||
@ -3749,13 +3746,10 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
|
||||
}
|
||||
q.MoveAllToActiveOrBackoffQueue(logger, tt.event, nil, nil, tt.preEnqueueCheck)
|
||||
var got []string
|
||||
for q.podBackoffQ.Len() != 0 {
|
||||
queuedPodInfo, err := q.podBackoffQ.Pop()
|
||||
if err != nil {
|
||||
t.Fatalf("Fail to pop pod from backoffQ: %v", err)
|
||||
}
|
||||
got = append(got, queuedPodInfo.Pod.Name)
|
||||
}
|
||||
c.Step(2 * q.backoffQ.podMaxBackoffDuration())
|
||||
q.backoffQ.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) {
|
||||
got = append(got, pInfo.Pod.Name)
|
||||
})
|
||||
if diff := cmp.Diff(tt.want, got); diff != "" {
|
||||
t.Errorf("Unexpected diff (-want, +got):\n%s", diff)
|
||||
}
|
||||
@ -3777,49 +3771,6 @@ func makeQueuedPodInfos(num int, namePrefix, label string, timestamp time.Time)
|
||||
return pInfos
|
||||
}
|
||||
|
||||
func TestPriorityQueue_calculateBackoffDuration(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
initialBackoffDuration time.Duration
|
||||
maxBackoffDuration time.Duration
|
||||
podInfo *framework.QueuedPodInfo
|
||||
want time.Duration
|
||||
}{
|
||||
{
|
||||
name: "normal",
|
||||
initialBackoffDuration: 1 * time.Nanosecond,
|
||||
maxBackoffDuration: 32 * time.Nanosecond,
|
||||
podInfo: &framework.QueuedPodInfo{Attempts: 16},
|
||||
want: 32 * time.Nanosecond,
|
||||
},
|
||||
{
|
||||
name: "overflow_32bit",
|
||||
initialBackoffDuration: 1 * time.Nanosecond,
|
||||
maxBackoffDuration: math.MaxInt32 * time.Nanosecond,
|
||||
podInfo: &framework.QueuedPodInfo{Attempts: 32},
|
||||
want: math.MaxInt32 * time.Nanosecond,
|
||||
},
|
||||
{
|
||||
name: "overflow_64bit",
|
||||
initialBackoffDuration: 1 * time.Nanosecond,
|
||||
maxBackoffDuration: math.MaxInt64 * time.Nanosecond,
|
||||
podInfo: &framework.QueuedPodInfo{Attempts: 64},
|
||||
want: math.MaxInt64 * time.Nanosecond,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
q := NewTestQueue(ctx, newDefaultQueueSort(), WithPodInitialBackoffDuration(tt.initialBackoffDuration), WithPodMaxBackoffDuration(tt.maxBackoffDuration))
|
||||
if got := q.calculateBackoffDuration(tt.podInfo); got != tt.want {
|
||||
t.Errorf("PriorityQueue.calculateBackoffDuration() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func mustNewTestPodInfo(t *testing.T, pod *v1.Pod) *framework.PodInfo {
|
||||
podInfo, err := framework.NewPodInfo(pod)
|
||||
if err != nil {
|
||||
@ -4190,12 +4141,12 @@ func TestPriorityQueue_GetPod(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
q := NewTestQueue(ctx, newDefaultQueueSort())
|
||||
q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
|
||||
unlockedActiveQ.AddOrUpdate(newQueuedPodInfoForLookup(activeQPod))
|
||||
})
|
||||
q.podBackoffQ.AddOrUpdate(newQueuedPodInfoForLookup(backoffQPod))
|
||||
q.backoffQ.add(logger, newQueuedPodInfoForLookup(backoffQPod))
|
||||
q.unschedulablePods.addOrUpdate(newQueuedPodInfoForLookup(unschedPod))
|
||||
|
||||
tests := []struct {
|
||||
|
Loading…
Reference in New Issue
Block a user