Use generics in scheduling queue's heap

This commit is contained in:
Maciej Skoczeń 2024-07-17 11:25:31 +00:00
parent 57d197fb89
commit 6b33e2e632
7 changed files with 277 additions and 410 deletions

View File

@ -147,9 +147,7 @@ func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
logger := sched.logger
pod := obj.(*v1.Pod)
logger.V(3).Info("Add event for unscheduled pod", "pod", klog.KObj(pod))
if err := sched.SchedulingQueue.Add(logger, pod); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
}
sched.SchedulingQueue.Add(logger, pod)
}
func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
@ -172,9 +170,7 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
}
logger.V(4).Info("Update event for unscheduled pod", "pod", klog.KObj(newPod))
if err := sched.SchedulingQueue.Update(logger, oldPod, newPod); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
}
sched.SchedulingQueue.Update(logger, oldPod, newPod)
}
func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
@ -199,9 +195,7 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
}
logger.V(3).Info("Delete event for unscheduled pod", "pod", klog.KObj(pod))
if err := sched.SchedulingQueue.Delete(pod); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
}
sched.SchedulingQueue.Delete(pod)
fwk, err := sched.frameworkForPod(pod)
if err != nil {
// This shouldn't happen, because we only accept for scheduling the pods

View File

@ -24,29 +24,28 @@ import (
"container/heap"
"fmt"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/scheduler/metrics"
)
// KeyFunc is a function type to get the key from an object.
type KeyFunc func(obj interface{}) (string, error)
type KeyFunc[T any] func(obj T) string
type heapItem struct {
obj interface{} // The object which is stored in the heap.
index int // The index of the object's key in the Heap.queue.
type heapItem[T any] struct {
obj T // The object which is stored in the heap.
index int // The index of the object's key in the Heap.queue.
}
type itemKeyValue struct {
type itemKeyValue[T any] struct {
key string
obj interface{}
obj T
}
// data is an internal struct that implements the standard heap interface
// and keeps the data stored in the heap.
type data struct {
type data[T any] struct {
// items is a map from key of the objects to the objects and their index.
// We depend on the property that items in the map are in the queue and vice versa.
items map[string]*heapItem
items map[string]*heapItem[T]
// queue implements a heap data structure and keeps the order of elements
// according to the heap invariant. The queue keeps the keys of objects stored
// in "items".
@ -54,18 +53,18 @@ type data struct {
// keyFunc is used to make the key used for queued item insertion and retrieval, and
// should be deterministic.
keyFunc KeyFunc
keyFunc KeyFunc[T]
// lessFunc is used to compare two objects in the heap.
lessFunc lessFunc
lessFunc LessFunc[T]
}
var (
_ = heap.Interface(&data{}) // heapData is a standard heap
_ = heap.Interface(&data[any]{}) // heapData is a standard heap
)
// Less compares two objects and returns true if the first one should go
// in front of the second one in the heap.
func (h *data) Less(i, j int) bool {
func (h *data[T]) Less(i, j int) bool {
if i > len(h.queue) || j > len(h.queue) {
return false
}
@ -81,11 +80,11 @@ func (h *data) Less(i, j int) bool {
}
// Len returns the number of items in the Heap.
func (h *data) Len() int { return len(h.queue) }
func (h *data[T]) Len() int { return len(h.queue) }
// Swap implements swapping of two elements in the heap. This is a part of standard
// heap interface and should never be called directly.
func (h *data) Swap(i, j int) {
func (h *data[T]) Swap(i, j int) {
h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
item := h.items[h.queue[i]]
item.index = i
@ -93,16 +92,16 @@ func (h *data) Swap(i, j int) {
item.index = j
}
// Push is supposed to be called by heap.Push only.
func (h *data) Push(kv interface{}) {
keyValue := kv.(*itemKeyValue)
// Push is supposed to be called by container/heap.Push only.
func (h *data[T]) Push(kv interface{}) {
keyValue := kv.(*itemKeyValue[T])
n := len(h.queue)
h.items[keyValue.key] = &heapItem{keyValue.obj, n}
h.items[keyValue.key] = &heapItem[T]{keyValue.obj, n}
h.queue = append(h.queue, keyValue.key)
}
// Pop is supposed to be called by heap.Pop only.
func (h *data) Pop() interface{} {
// Pop is supposed to be called by container/heap.Pop only.
func (h *data[T]) Pop() interface{} {
key := h.queue[len(h.queue)-1]
h.queue = h.queue[0 : len(h.queue)-1]
item, ok := h.items[key]
@ -114,56 +113,44 @@ func (h *data) Pop() interface{} {
return item.obj
}
// Peek is supposed to be called by heap.Peek only.
func (h *data) Peek() interface{} {
// Peek returns the head of the heap without removing it.
func (h *data[T]) Peek() (T, bool) {
if len(h.queue) > 0 {
return h.items[h.queue[0]].obj
return h.items[h.queue[0]].obj, true
}
return nil
var zero T
return zero, false
}
// Heap is a producer/consumer queue that implements a heap data structure.
// It can be used to implement priority queues and similar data structures.
type Heap struct {
type Heap[T any] struct {
// data stores objects and has a queue that keeps their ordering according
// to the heap invariant.
data *data
data *data[T]
// metricRecorder updates the counter when elements of a heap get added or
// removed, and it does nothing if it's nil
metricRecorder metrics.MetricRecorder
}
// Add inserts an item, and puts it in the queue. The item is updated if it
// AddOrUpdate inserts an item, and puts it in the queue. The item is updated if it
// already exists.
func (h *Heap) Add(obj interface{}) error {
key, err := h.data.keyFunc(obj)
if err != nil {
return cache.KeyError{Obj: obj, Err: err}
}
func (h *Heap[T]) AddOrUpdate(obj T) {
key := h.data.keyFunc(obj)
if _, exists := h.data.items[key]; exists {
h.data.items[key].obj = obj
heap.Fix(h.data, h.data.items[key].index)
} else {
heap.Push(h.data, &itemKeyValue{key, obj})
heap.Push(h.data, &itemKeyValue[T]{key, obj})
if h.metricRecorder != nil {
h.metricRecorder.Inc()
}
}
return nil
}
// Update is the same as Add in this implementation. When the item does not
// exist, it is added.
func (h *Heap) Update(obj interface{}) error {
return h.Add(obj)
}
// Delete removes an item.
func (h *Heap) Delete(obj interface{}) error {
key, err := h.data.keyFunc(obj)
if err != nil {
return cache.KeyError{Obj: obj, Err: err}
}
func (h *Heap[T]) Delete(obj T) error {
key := h.data.keyFunc(obj)
if item, ok := h.data.items[key]; ok {
heap.Remove(h.data, item.index)
if h.metricRecorder != nil {
@ -175,43 +162,48 @@ func (h *Heap) Delete(obj interface{}) error {
}
// Peek returns the head of the heap without removing it.
func (h *Heap) Peek() interface{} {
func (h *Heap[T]) Peek() (T, bool) {
return h.data.Peek()
}
// Pop returns the head of the heap and removes it.
func (h *Heap) Pop() (interface{}, error) {
func (h *Heap[T]) Pop() (T, error) {
obj := heap.Pop(h.data)
if obj != nil {
if h.metricRecorder != nil {
h.metricRecorder.Dec()
}
return obj, nil
return obj.(T), nil
}
return nil, fmt.Errorf("object was removed from heap data")
var zero T
return zero, fmt.Errorf("heap is empty")
}
// Get returns the requested item, or sets exists=false.
func (h *Heap) Get(obj interface{}) (interface{}, bool, error) {
key, err := h.data.keyFunc(obj)
if err != nil {
return nil, false, cache.KeyError{Obj: obj, Err: err}
}
func (h *Heap[T]) Get(obj T) (T, bool) {
key := h.data.keyFunc(obj)
return h.GetByKey(key)
}
// GetByKey returns the requested item, or sets exists=false.
func (h *Heap) GetByKey(key string) (interface{}, bool, error) {
func (h *Heap[T]) GetByKey(key string) (T, bool) {
item, exists := h.data.items[key]
if !exists {
return nil, false, nil
var zero T
return zero, false
}
return item.obj, true, nil
return item.obj, true
}
func (h *Heap[T]) Has(obj T) bool {
key := h.data.keyFunc(obj)
_, ok := h.GetByKey(key)
return ok
}
// List returns a list of all the items.
func (h *Heap) List() []interface{} {
list := make([]interface{}, 0, len(h.data.items))
func (h *Heap[T]) List() []T {
list := make([]T, 0, len(h.data.items))
for _, item := range h.data.items {
list = append(list, item.obj)
}
@ -219,20 +211,20 @@ func (h *Heap) List() []interface{} {
}
// Len returns the number of items in the heap.
func (h *Heap) Len() int {
func (h *Heap[T]) Len() int {
return len(h.data.queue)
}
// New returns a Heap which can be used to queue up items to process.
func New(keyFn KeyFunc, lessFn lessFunc) *Heap {
func New[T any](keyFn KeyFunc[T], lessFn LessFunc[T]) *Heap[T] {
return NewWithRecorder(keyFn, lessFn, nil)
}
// NewWithRecorder wraps an optional metricRecorder to compose a Heap object.
func NewWithRecorder(keyFn KeyFunc, lessFn lessFunc, metricRecorder metrics.MetricRecorder) *Heap {
return &Heap{
data: &data{
items: map[string]*heapItem{},
func NewWithRecorder[T any](keyFn KeyFunc[T], lessFn LessFunc[T], metricRecorder metrics.MetricRecorder) *Heap[T] {
return &Heap[T]{
data: &data[T]{
items: map[string]*heapItem[T]{},
queue: []string{},
keyFunc: keyFn,
lessFunc: lessFn,
@ -241,6 +233,6 @@ func NewWithRecorder(keyFn KeyFunc, lessFn lessFunc, metricRecorder metrics.Metr
}
}
// lessFunc is a function that receives two items and returns true if the first
// LessFunc is a function that receives two items and returns true if the first
// item should be placed before the second one when the list is sorted.
type lessFunc = func(item1, item2 interface{}) bool
type LessFunc[T any] func(item1, item2 T) bool

View File

@ -23,8 +23,8 @@ import (
"testing"
)
func testHeapObjectKeyFunc(obj interface{}) (string, error) {
return obj.(testHeapObject).name, nil
func testHeapObjectKeyFunc(obj testHeapObject) string {
return obj.name
}
type testHeapObject struct {
@ -56,9 +56,9 @@ func mkHeapObj(name string, val interface{}) testHeapObject {
return testHeapObject{name: name, val: val}
}
func compareInts(val1 interface{}, val2 interface{}) bool {
first := val1.(testHeapObject).val.(int)
second := val2.(testHeapObject).val.(int)
func compareInts(val1 testHeapObject, val2 testHeapObject) bool {
first := val1.val.(int)
second := val2.val.(int)
return first < second
}
@ -67,17 +67,18 @@ func TestHeapBasic(t *testing.T) {
h := New(testHeapObjectKeyFunc, compareInts)
const amount = 500
var i int
var zero testHeapObject
// empty queue
if item := h.Peek(); item != nil {
if item, ok := h.Peek(); ok || item != zero {
t.Errorf("expected nil object but got %v", item)
}
for i = amount; i > 0; i-- {
h.Add(mkHeapObj(string([]rune{'a', rune(i)}), i))
h.AddOrUpdate(mkHeapObj(string([]rune{'a', rune(i)}), i))
// Retrieve head without removing it
head := h.Peek()
if e, a := i, head.(testHeapObject).val; a != e {
head, ok := h.Peek()
if e, a := i, head.val; !ok || a != e {
t.Errorf("expected %d, got %d", e, a)
}
}
@ -85,41 +86,44 @@ func TestHeapBasic(t *testing.T) {
// Make sure that the numbers are popped in ascending order.
prevNum := 0
for i := 0; i < amount; i++ {
obj, err := h.Pop()
num := obj.(testHeapObject).val.(int)
item, err := h.Pop()
num := item.val.(int)
// All the items must be sorted.
if err != nil || prevNum > num {
t.Errorf("got %v out of order, last was %v", obj, prevNum)
t.Errorf("got %v out of order, last was %v", item, prevNum)
}
prevNum = num
}
}
// Tests Heap.Add and ensures that heap invariant is preserved after adding items.
func TestHeap_Add(t *testing.T) {
// TestHeap_AddOrUpdate_Add tests add capabilities of Heap.AddOrUpdate
// and ensures that heap invariant is preserved after adding items.
func TestHeap_AddOrUpdate_Add(t *testing.T) {
h := New(testHeapObjectKeyFunc, compareInts)
h.Add(mkHeapObj("foo", 10))
h.Add(mkHeapObj("bar", 1))
h.Add(mkHeapObj("baz", 11))
h.Add(mkHeapObj("zab", 30))
h.Add(mkHeapObj("foo", 13)) // This updates "foo".
h.AddOrUpdate(mkHeapObj("foo", 10))
h.AddOrUpdate(mkHeapObj("bar", 1))
h.AddOrUpdate(mkHeapObj("baz", 11))
h.AddOrUpdate(mkHeapObj("zab", 30))
h.AddOrUpdate(mkHeapObj("foo", 13)) // This updates "foo".
item, err := h.Pop()
if e, a := 1, item.(testHeapObject).val; err != nil || a != e {
if e, a := 1, item.val; err != nil || a != e {
t.Fatalf("expected %d, got %d", e, a)
}
item, err = h.Pop()
if e, a := 11, item.(testHeapObject).val; err != nil || a != e {
if e, a := 11, item.val; err != nil || a != e {
t.Fatalf("expected %d, got %d", e, a)
}
h.Delete(mkHeapObj("baz", 11)) // Nothing is deleted.
h.Add(mkHeapObj("foo", 14)) // foo is updated.
if err := h.Delete(mkHeapObj("baz", 11)); err == nil { // Nothing is deleted.
t.Fatalf("nothing should be deleted from the heap")
}
h.AddOrUpdate(mkHeapObj("foo", 14)) // foo is updated.
item, err = h.Pop()
if e, a := 14, item.(testHeapObject).val; err != nil || a != e {
if e, a := 14, item.val; err != nil || a != e {
t.Fatalf("expected %d, got %d", e, a)
}
item, err = h.Pop()
if e, a := 30, item.(testHeapObject).val; err != nil || a != e {
if e, a := 30, item.val; err != nil || a != e {
t.Fatalf("expected %d, got %d", e, a)
}
}
@ -128,21 +132,21 @@ func TestHeap_Add(t *testing.T) {
// preserved after deleting items.
func TestHeap_Delete(t *testing.T) {
h := New(testHeapObjectKeyFunc, compareInts)
h.Add(mkHeapObj("foo", 10))
h.Add(mkHeapObj("bar", 1))
h.Add(mkHeapObj("bal", 31))
h.Add(mkHeapObj("baz", 11))
h.AddOrUpdate(mkHeapObj("foo", 10))
h.AddOrUpdate(mkHeapObj("bar", 1))
h.AddOrUpdate(mkHeapObj("bal", 31))
h.AddOrUpdate(mkHeapObj("baz", 11))
// Delete head. Delete should work with "key" and doesn't care about the value.
if err := h.Delete(mkHeapObj("bar", 200)); err != nil {
t.Fatalf("Failed to delete head.")
}
item, err := h.Pop()
if e, a := 10, item.(testHeapObject).val; err != nil || a != e {
if e, a := 10, item.val; err != nil || a != e {
t.Fatalf("expected %d, got %d", e, a)
}
h.Add(mkHeapObj("zab", 30))
h.Add(mkHeapObj("faz", 30))
h.AddOrUpdate(mkHeapObj("zab", 30))
h.AddOrUpdate(mkHeapObj("faz", 30))
len := h.data.Len()
// Delete non-existing item.
if err = h.Delete(mkHeapObj("non-existent", 10)); err == nil || len != h.data.Len() {
@ -157,11 +161,11 @@ func TestHeap_Delete(t *testing.T) {
t.Fatalf("Failed to delete item.")
}
item, err = h.Pop()
if e, a := 11, item.(testHeapObject).val; err != nil || a != e {
if e, a := 11, item.val; err != nil || a != e {
t.Fatalf("expected %d, got %d", e, a)
}
item, err = h.Pop()
if e, a := 30, item.(testHeapObject).val; err != nil || a != e {
if e, a := 30, item.val; err != nil || a != e {
t.Fatalf("expected %d, got %d", e, a)
}
if h.data.Len() != 0 {
@ -169,26 +173,26 @@ func TestHeap_Delete(t *testing.T) {
}
}
// TestHeap_Update tests Heap.Update and ensures that heap invariant is
// preserved after adding items.
func TestHeap_Update(t *testing.T) {
// TestHeap_AddOrUpdate_Update tests update capabilities of Heap.Update
// and ensures that heap invariant is preserved after adding items.
func TestHeap_AddOrUpdate_Update(t *testing.T) {
h := New(testHeapObjectKeyFunc, compareInts)
h.Add(mkHeapObj("foo", 10))
h.Add(mkHeapObj("bar", 1))
h.Add(mkHeapObj("bal", 31))
h.Add(mkHeapObj("baz", 11))
h.AddOrUpdate(mkHeapObj("foo", 10))
h.AddOrUpdate(mkHeapObj("bar", 1))
h.AddOrUpdate(mkHeapObj("bal", 31))
h.AddOrUpdate(mkHeapObj("baz", 11))
// Update an item to a value that should push it to the head.
h.Update(mkHeapObj("baz", 0))
h.AddOrUpdate(mkHeapObj("baz", 0))
if h.data.queue[0] != "baz" || h.data.items["baz"].index != 0 {
t.Fatalf("expected baz to be at the head")
}
item, err := h.Pop()
if e, a := 0, item.(testHeapObject).val; err != nil || a != e {
if e, a := 0, item.val; err != nil || a != e {
t.Fatalf("expected %d, got %d", e, a)
}
// Update bar to push it farther back in the queue.
h.Update(mkHeapObj("bar", 100))
h.AddOrUpdate(mkHeapObj("bar", 100))
if h.data.queue[0] != "foo" || h.data.items["foo"].index != 0 {
t.Fatalf("expected foo to be at the head")
}
@ -197,19 +201,19 @@ func TestHeap_Update(t *testing.T) {
// TestHeap_Get tests Heap.Get.
func TestHeap_Get(t *testing.T) {
h := New(testHeapObjectKeyFunc, compareInts)
h.Add(mkHeapObj("foo", 10))
h.Add(mkHeapObj("bar", 1))
h.Add(mkHeapObj("bal", 31))
h.Add(mkHeapObj("baz", 11))
h.AddOrUpdate(mkHeapObj("foo", 10))
h.AddOrUpdate(mkHeapObj("bar", 1))
h.AddOrUpdate(mkHeapObj("bal", 31))
h.AddOrUpdate(mkHeapObj("baz", 11))
// Get works with the key.
obj, exists, err := h.Get(mkHeapObj("baz", 0))
if err != nil || exists == false || obj.(testHeapObject).val != 11 {
item, exists := h.Get(mkHeapObj("baz", 0))
if !exists || item.val != 11 {
t.Fatalf("unexpected error in getting element")
}
// Get non-existing object.
_, exists, err = h.Get(mkHeapObj("non-existing", 0))
if err != nil || exists {
_, exists = h.Get(mkHeapObj("non-existing", 0))
if exists {
t.Fatalf("didn't expect to get any object")
}
}
@ -217,18 +221,18 @@ func TestHeap_Get(t *testing.T) {
// TestHeap_GetByKey tests Heap.GetByKey and is very similar to TestHeap_Get.
func TestHeap_GetByKey(t *testing.T) {
h := New(testHeapObjectKeyFunc, compareInts)
h.Add(mkHeapObj("foo", 10))
h.Add(mkHeapObj("bar", 1))
h.Add(mkHeapObj("bal", 31))
h.Add(mkHeapObj("baz", 11))
h.AddOrUpdate(mkHeapObj("foo", 10))
h.AddOrUpdate(mkHeapObj("bar", 1))
h.AddOrUpdate(mkHeapObj("bal", 31))
h.AddOrUpdate(mkHeapObj("baz", 11))
obj, exists, err := h.GetByKey("baz")
if err != nil || !exists || obj.(testHeapObject).val != 11 {
item, exists := h.GetByKey("baz")
if !exists || item.val != 11 {
t.Fatalf("unexpected error in getting element")
}
// Get non-existing object.
_, exists, err = h.GetByKey("non-existing")
if err != nil || exists {
_, exists = h.GetByKey("non-existing")
if exists {
t.Fatalf("didn't expect to get any object")
}
}
@ -249,14 +253,13 @@ func TestHeap_List(t *testing.T) {
"faz": 30,
}
for k, v := range items {
h.Add(mkHeapObj(k, v))
h.AddOrUpdate(mkHeapObj(k, v))
}
list = h.List()
if len(list) != len(items) {
t.Errorf("expected %d items, got %d", len(items), len(list))
}
for _, obj := range list {
heapObj := obj.(testHeapObject)
for _, heapObj := range list {
v, ok := items[heapObj.name]
if !ok || v != heapObj.val {
t.Errorf("unexpected item in the list: %v", heapObj)
@ -267,10 +270,10 @@ func TestHeap_List(t *testing.T) {
func TestHeapWithRecorder(t *testing.T) {
metricRecorder := new(testMetricRecorder)
h := NewWithRecorder(testHeapObjectKeyFunc, compareInts, metricRecorder)
h.Add(mkHeapObj("foo", 10))
h.Add(mkHeapObj("bar", 1))
h.Add(mkHeapObj("baz", 100))
h.Add(mkHeapObj("qux", 11))
h.AddOrUpdate(mkHeapObj("foo", 10))
h.AddOrUpdate(mkHeapObj("bar", 1))
h.AddOrUpdate(mkHeapObj("baz", 100))
h.AddOrUpdate(mkHeapObj("qux", 11))
if *metricRecorder != 4 {
t.Errorf("expected count to be 4 but got %d", *metricRecorder)

View File

@ -93,7 +93,7 @@ type PreEnqueueCheck func(pod *v1.Pod) bool
// makes it easy to use those data structures as a SchedulingQueue.
type SchedulingQueue interface {
framework.PodNominator
Add(logger klog.Logger, pod *v1.Pod) error
Add(logger klog.Logger, pod *v1.Pod)
// Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ.
// The passed-in pods are originally compiled from plugins that want to activate Pods,
// by injecting the pods through a reserved CycleState struct (PodsToActivate).
@ -112,8 +112,8 @@ type SchedulingQueue interface {
// Done must be called for pod returned by Pop. This allows the queue to
// keep track of which pods are currently being processed.
Done(types.UID)
Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error
Update(logger klog.Logger, oldPod, newPod *v1.Pod)
Delete(pod *v1.Pod)
// TODO(sanposhiho): move all PreEnqueueCheck to Requeue and delete it from this parameter eventually.
// Some PreEnqueueCheck include event filtering logic based on some in-tree plugins
// and it affect badly to other plugins.
@ -212,10 +212,10 @@ type PriorityQueue struct {
// activeQ is heap structure that scheduler actively looks at to find pods to
// schedule. Head of heap is the highest priority pod. It should be protected by activeQLock.
activeQ *heap.Heap
activeQ *heap.Heap[*framework.QueuedPodInfo]
// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
// are popped from this heap before the scheduler looks at activeQ
podBackoffQ *heap.Heap
podBackoffQ *heap.Heap[*framework.QueuedPodInfo]
// unschedulablePods holds pods that have been tried and determined unschedulable.
unschedulablePods *UnschedulablePods
// schedulingCycle represents sequence number of scheduling cycle and is incremented
@ -382,19 +382,13 @@ func NewPriorityQueue(
opt(&options)
}
comp := func(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*framework.QueuedPodInfo)
pInfo2 := podInfo2.(*framework.QueuedPodInfo)
return lessFn(pInfo1, pInfo2)
}
pq := &PriorityQueue{
clock: options.clock,
stop: make(chan struct{}),
podInitialBackoffDuration: options.podInitialBackoffDuration,
podMaxBackoffDuration: options.podMaxBackoffDuration,
podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
activeQ: heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()),
unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()),
inFlightPods: make(map[types.UID]*list.Element),
inFlightEvents: list.New(),
@ -603,7 +597,7 @@ func (p *PriorityQueue) runPreEnqueuePlugin(ctx context.Context, pl framework.Pr
// It returns 2 parameters:
// 1. a boolean flag to indicate whether the pod is added successfully.
// 2. an error for the caller to act on.
func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) (bool, error) {
func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) bool {
gatedBefore := pInfo.Gated
pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo)
@ -611,24 +605,21 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue
defer p.activeQLock.Unlock()
if pInfo.Gated {
// Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins.
if _, exists, _ := p.activeQ.Get(pInfo); exists {
return false, nil
if p.activeQ.Has(pInfo) {
return false
}
if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
return false, nil
if p.podBackoffQ.Has(pInfo) {
return false
}
p.unschedulablePods.addOrUpdate(pInfo)
return false, nil
return false
}
if pInfo.InitialAttemptTimestamp == nil {
now := p.clock.Now()
pInfo.InitialAttemptTimestamp = &now
}
if err := p.activeQ.Add(pInfo); err != nil {
logger.Error(err, "Error adding pod to the active queue", "pod", klog.KObj(pInfo.Pod))
return false, err
}
p.activeQ.AddOrUpdate(pInfo)
p.unschedulablePods.delete(pInfo.Pod, gatedBefore)
_ = p.podBackoffQ.Delete(pInfo) // Don't need to react when pInfo is not found.
@ -638,22 +629,19 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue
p.AddNominatedPod(logger, pInfo.PodInfo, nil)
}
return true, nil
return true
}
// Add adds a pod to the active queue. It should be called only when a new pod
// is added so there is no chance the pod is already in active/unschedulable/backoff queues
func (p *PriorityQueue) Add(logger klog.Logger, pod *v1.Pod) error {
func (p *PriorityQueue) Add(logger klog.Logger, pod *v1.Pod) {
p.lock.Lock()
defer p.lock.Unlock()
pInfo := p.newQueuedPodInfo(pod)
if added, err := p.moveToActiveQ(logger, pInfo, framework.PodAdd); !added {
return err
if added := p.moveToActiveQ(logger, pInfo, framework.PodAdd); added {
p.cond.Broadcast()
}
p.cond.Broadcast()
return nil
}
// Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ.
@ -676,8 +664,7 @@ func (p *PriorityQueue) Activate(logger klog.Logger, pods map[string]*v1.Pod) {
func (p *PriorityQueue) existsInActiveQ(pInfo *framework.QueuedPodInfo) bool {
p.activeQLock.RLock()
defer p.activeQLock.RUnlock()
_, exists, _ := p.activeQ.Get(pInfo)
return exists
return p.activeQ.Has(pInfo)
}
func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool {
@ -686,10 +673,10 @@ func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool {
if pInfo = p.unschedulablePods.get(pod); pInfo == nil {
// If the pod doesn't belong to unschedulablePods or backoffQ, don't activate it.
// The pod can be already in activeQ.
if obj, exists, _ := p.podBackoffQ.Get(newQueuedPodInfoForLookup(pod)); !exists {
var exists bool
pInfo, exists = p.podBackoffQ.Get(newQueuedPodInfoForLookup(pod))
if !exists {
return false
} else {
pInfo = obj.(*framework.QueuedPodInfo)
}
}
@ -699,8 +686,7 @@ func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool {
return false
}
added, _ := p.moveToActiveQ(logger, pInfo, framework.ForceActivate)
return added
return p.moveToActiveQ(logger, pInfo, framework.ForceActivate)
}
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
@ -820,9 +806,7 @@ func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger,
// - No unschedulable plugins are associated with this Pod,
// meaning something unusual (a temporal failure on kube-apiserver, etc) happened and this Pod gets moved back to the queue.
// In this case, we should retry scheduling it because this Pod may not be retried until the next flush.
if err := p.podBackoffQ.Add(pInfo); err != nil {
return fmt.Errorf("error adding pod %v to the backoff queue: %v", klog.KObj(pod), err)
}
p.podBackoffQ.AddOrUpdate(pInfo)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", backoffQ)
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", framework.ScheduleAttemptFailure).Inc()
} else {
@ -853,7 +837,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *
if p.existsInActiveQ(pInfo) {
return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod))
}
if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
if p.podBackoffQ.Has(pInfo) {
return fmt.Errorf("Pod %v is already present in the backoff queue", klog.KObj(pod))
}
@ -892,11 +876,10 @@ func (p *PriorityQueue) flushBackoffQCompleted(logger klog.Logger) {
defer p.lock.Unlock()
activated := false
for {
rawPodInfo := p.podBackoffQ.Peek()
if rawPodInfo == nil {
pInfo, ok := p.podBackoffQ.Peek()
if !ok || pInfo == nil {
break
}
pInfo := rawPodInfo.(*framework.QueuedPodInfo)
pod := pInfo.Pod
if p.isPodBackingoff(pInfo) {
break
@ -906,7 +889,7 @@ func (p *PriorityQueue) flushBackoffQCompleted(logger klog.Logger) {
logger.Error(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
break
}
if added, _ := p.moveToActiveQ(logger, pInfo, framework.BackoffComplete); added {
if added := p.moveToActiveQ(logger, pInfo, framework.BackoffComplete); added {
activated = true
}
}
@ -954,11 +937,10 @@ func (p *PriorityQueue) Pop(logger klog.Logger) (*framework.QueuedPodInfo, error
}
p.cond.Wait()
}
obj, err := p.activeQ.Pop()
pInfo, err := p.activeQ.Pop()
if err != nil {
return nil, err
}
pInfo := obj.(*framework.QueuedPodInfo)
pInfo.Attempts++
p.schedulingCycle++
// In flight, no concurrent events yet.
@ -1039,22 +1021,23 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool {
return !reflect.DeepEqual(strip(oldPod), strip(newPod))
}
func (p *PriorityQueue) updateInActiveQueue(logger klog.Logger, oldPod, newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) (bool, error) {
func (p *PriorityQueue) updateInActiveQueue(logger klog.Logger, oldPod, newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) bool {
p.activeQLock.Lock()
defer p.activeQLock.Unlock()
if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
pInfo := updatePod(oldPodInfo, newPod)
if pInfo, exists := p.activeQ.Get(oldPodInfo); exists {
_ = pInfo.Update(newPod)
p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo)
return true, p.activeQ.Update(pInfo)
p.activeQ.AddOrUpdate(pInfo)
return true
}
return false, nil
return false
}
// Update updates a pod in the active or backoff queue if present. Otherwise, it removes
// the item from the unschedulable queue if pod is updated in a way that it may
// become schedulable and adds the updated one to the active queue.
// If pod is not present in any of the queues, it is added to the active queue.
func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error {
func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
p.lock.Lock()
defer p.lock.Unlock()
@ -1075,7 +1058,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
})
p.activeQLock.Unlock()
return nil
return
}
p.activeQLock.Unlock()
}
@ -1083,79 +1066,72 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
if oldPod != nil {
oldPodInfo := newQueuedPodInfoForLookup(oldPod)
// If the pod is already in the active queue, just update it there.
if exists, err := p.updateInActiveQueue(logger, oldPod, newPod, oldPodInfo); exists {
return err
if exists := p.updateInActiveQueue(logger, oldPod, newPod, oldPodInfo); exists {
return
}
// If the pod is in the backoff queue, update it there.
if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists {
pInfo := updatePod(oldPodInfo, newPod)
if pInfo, exists := p.podBackoffQ.Get(oldPodInfo); exists {
_ = pInfo.Update(newPod)
p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo)
return p.podBackoffQ.Update(pInfo)
p.podBackoffQ.AddOrUpdate(pInfo)
return
}
}
// If the pod is in the unschedulable queue, updating it may make it schedulable.
if usPodInfo := p.unschedulablePods.get(newPod); usPodInfo != nil {
pInfo := updatePod(usPodInfo, newPod)
if pInfo := p.unschedulablePods.get(newPod); pInfo != nil {
_ = pInfo.Update(newPod)
p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo)
gated := usPodInfo.Gated
gated := pInfo.Gated
if p.isSchedulingQueueHintEnabled {
// When unscheduled Pods are updated, we check with QueueingHint
// whether the update may make the pods schedulable.
// Plugins have to implement a QueueingHint for Pod/Update event
// if the rejection from them could be resolved by updating unscheduled Pods itself.
events := framework.PodSchedulingPropertiesChange(newPod, oldPod)
for _, evt := range events {
hint := p.isPodWorthRequeuing(logger, pInfo, evt, oldPod, newPod)
queue := p.requeuePodViaQueueingHint(logger, pInfo, hint, framework.UnscheduledPodUpdate.Label)
if queue != unschedulablePods {
logger.V(5).Info("Pod moved to an internal scheduling queue because the Pod is updated", "pod", klog.KObj(newPod), "event", framework.PodUpdate, "queue", queue)
p.unschedulablePods.delete(usPodInfo.Pod, gated)
p.unschedulablePods.delete(pInfo.Pod, gated)
}
if queue == activeQ {
p.cond.Broadcast()
break
}
}
return nil
return
}
if isPodUpdated(oldPod, newPod) {
if p.isPodBackingoff(usPodInfo) {
if err := p.podBackoffQ.Add(pInfo); err != nil {
return err
}
p.unschedulablePods.delete(usPodInfo.Pod, gated)
if p.isPodBackingoff(pInfo) {
p.podBackoffQ.AddOrUpdate(pInfo)
p.unschedulablePods.delete(pInfo.Pod, gated)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", framework.PodUpdate, "queue", backoffQ)
return nil
return
}
if added, err := p.moveToActiveQ(logger, pInfo, framework.BackoffComplete); !added {
return err
if added := p.moveToActiveQ(logger, pInfo, framework.BackoffComplete); added {
p.cond.Broadcast()
}
p.cond.Broadcast()
return nil
return
}
// Pod update didn't make it schedulable, keep it in the unschedulable queue.
p.unschedulablePods.addOrUpdate(pInfo)
return nil
return
}
// If pod is not in any of the queues, we put it in the active queue.
pInfo := p.newQueuedPodInfo(newPod)
if added, err := p.moveToActiveQ(logger, pInfo, framework.PodUpdate); !added {
return err
if added := p.moveToActiveQ(logger, pInfo, framework.PodUpdate); added {
p.cond.Broadcast()
}
p.cond.Broadcast()
return nil
}
// Delete deletes the item from either of the two queues. It assumes the pod is
// only in one queue.
func (p *PriorityQueue) Delete(pod *v1.Pod) error {
func (p *PriorityQueue) Delete(pod *v1.Pod) {
p.lock.Lock()
defer p.lock.Unlock()
p.DeleteNominatedPodIfExists(pod)
@ -1169,7 +1145,6 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
p.unschedulablePods.delete(pod, pInfo.Gated)
}
}
return nil
}
// AssignedPodAdded is called when a bound pod is added. Creation of this pod
@ -1241,25 +1216,14 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra
return unschedulablePods
}
pod := pInfo.Pod
if strategy == queueAfterBackoff && p.isPodBackingoff(pInfo) {
if err := p.podBackoffQ.Add(pInfo); err != nil {
logger.Error(err, "Error adding pod to the backoff queue, queue this Pod to unschedulable pod pool", "pod", klog.KObj(pod))
p.unschedulablePods.addOrUpdate(pInfo)
return unschedulablePods
}
p.podBackoffQ.AddOrUpdate(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
return backoffQ
}
// Reach here if schedulingHint is QueueImmediately, or schedulingHint is Queue but the pod is not backing off.
added, err := p.moveToActiveQ(logger, pInfo, event)
if err != nil {
logger.Error(err, "Error adding pod to the active queue, queue this Pod to unschedulable pod pool", "pod", klog.KObj(pod))
}
if added {
if added := p.moveToActiveQ(logger, pInfo, event); added {
return activeQ
}
if pInfo.Gated {
@ -1368,7 +1332,7 @@ func (p *PriorityQueue) PodsInActiveQ() []*v1.Pod {
defer p.activeQLock.RUnlock()
var result []*v1.Pod
for _, pInfo := range p.activeQ.List() {
result = append(result, pInfo.(*framework.QueuedPodInfo).Pod)
result = append(result, pInfo.Pod)
}
return result
}
@ -1384,7 +1348,7 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) {
result := p.PodsInActiveQ()
activeQLen := len(result)
for _, pInfo := range p.podBackoffQ.List() {
result = append(result, pInfo.(*framework.QueuedPodInfo).Pod)
result = append(result, pInfo.Pod)
}
for _, pInfo := range p.unschedulablePods.podInfoMap {
result = append(result, pInfo.Pod)
@ -1397,20 +1361,18 @@ func (p *PriorityQueue) nominatedPodToInfo(np PodRef) *framework.PodInfo {
pod := np.ToPod()
pInfoLookup := newQueuedPodInfoForLookup(pod)
obj, exists, _ := p.activeQ.Get(pInfoLookup)
queuedPodInfo, exists := p.activeQ.Get(pInfoLookup)
if exists {
queuedPodInfo := obj.(*framework.QueuedPodInfo)
return queuedPodInfo.PodInfo
}
queuedPodInfo := p.unschedulablePods.get(pod)
queuedPodInfo = p.unschedulablePods.get(pod)
if queuedPodInfo != nil {
return queuedPodInfo.PodInfo
}
obj, exists, _ = p.podBackoffQ.Get(pInfoLookup)
queuedPodInfo, exists = p.podBackoffQ.Get(pInfoLookup)
if exists {
queuedPodInfo := obj.(*framework.QueuedPodInfo)
return queuedPodInfo.PodInfo
}
@ -1469,9 +1431,7 @@ func (npm *nominator) NominatedPodsForNode(nodeName string) []*framework.PodInfo
return npm.nominatedPodsToInfo(nominatedPods)
}
func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*framework.QueuedPodInfo)
pInfo2 := podInfo2.(*framework.QueuedPodInfo)
func (p *PriorityQueue) podsCompareBackoffCompleted(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
bo1 := p.getBackoffTime(pInfo1)
bo2 := p.getBackoffTime(pInfo2)
return bo1.Before(bo2)
@ -1512,12 +1472,6 @@ func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInf
return duration
}
func updatePod(oldPodInfo interface{}, newPod *v1.Pod) *framework.QueuedPodInfo {
pInfo := oldPodInfo.(*framework.QueuedPodInfo)
pInfo.Update(newPod)
return pInfo
}
// UnschedulablePods holds pods that cannot be scheduled. This data structure
// is used to implement unschedulablePods.
type UnschedulablePods struct {
@ -1729,6 +1683,6 @@ func newPodNominator(podLister listersv1.PodLister, nominatedPodsToInfo func([]P
}
}
func podInfoKeyFunc(obj interface{}) (string, error) {
return cache.MetaNamespaceKeyFunc(obj.(*framework.QueuedPodInfo).Pod)
func podInfoKeyFunc(pInfo *framework.QueuedPodInfo) string {
return cache.NewObjectName(pInfo.Pod.Namespace, pInfo.Pod.Name).String()
}

View File

@ -122,15 +122,9 @@ func TestPriorityQueue_Add(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs)
if err := q.Add(logger, medPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err)
}
if err := q.Add(logger, unschedulablePodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err)
}
if err := q.Add(logger, highPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err)
}
q.Add(logger, medPriorityPodInfo.Pod)
q.Add(logger, unschedulablePodInfo.Pod)
q.Add(logger, highPriorityPodInfo.Pod)
expectedNominatedPods := &nominator{
nominatedPodToNode: map[types.UID]string{
medPriorityPodInfo.Pod.UID: "node1",
@ -168,12 +162,8 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs)
if err := q.Add(logger, medPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err)
}
if err := q.Add(logger, highPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err)
}
q.Add(logger, medPriorityPodInfo.Pod)
q.Add(logger, highPriorityPodInfo.Pod)
if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
}
@ -748,17 +738,17 @@ func Test_InFlightPods(t *testing.T) {
}
if test.wantActiveQPodNames != nil {
rawPodInfos := q.activeQ.List()
if len(rawPodInfos) != len(test.wantActiveQPodNames) {
diff := cmp.Diff(test.wantActiveQPodNames, rawPodInfos, cmpopts.SortSlices(func(a, b interface{}) bool {
podInfos := q.activeQ.List()
if len(podInfos) != len(test.wantActiveQPodNames) {
diff := cmp.Diff(test.wantActiveQPodNames, podInfos, cmpopts.SortSlices(func(a, b interface{}) bool {
return a.(framework.PodInfo).Pod.Name < b.(framework.PodInfo).Pod.Name
}))
t.Fatalf("Length of activeQ is not expected. Got %v, want %v.\n%s", len(rawPodInfos), len(test.wantActiveQPodNames), diff)
t.Fatalf("Length of activeQ is not expected. Got %v, want %v.\n%s", len(podInfos), len(test.wantActiveQPodNames), diff)
}
wantPodNames := sets.New(test.wantActiveQPodNames...)
for _, rawPodInfo := range rawPodInfos {
podGotFromActiveQ := rawPodInfo.(*framework.QueuedPodInfo).Pod
for _, podInfo := range podInfos {
podGotFromActiveQ := podInfo.Pod
if !wantPodNames.Has(podGotFromActiveQ.Name) {
t.Fatalf("Pod %v was not expected to be in the activeQ.", podGotFromActiveQ.Name)
}
@ -766,17 +756,17 @@ func Test_InFlightPods(t *testing.T) {
}
if test.wantBackoffQPodNames != nil {
rawPodInfos := q.podBackoffQ.List()
if len(rawPodInfos) != len(test.wantBackoffQPodNames) {
diff := cmp.Diff(test.wantBackoffQPodNames, rawPodInfos, cmpopts.SortSlices(func(a, b interface{}) bool {
podInfos := q.podBackoffQ.List()
if len(podInfos) != len(test.wantBackoffQPodNames) {
diff := cmp.Diff(test.wantBackoffQPodNames, podInfos, cmpopts.SortSlices(func(a, b interface{}) bool {
return a.(framework.PodInfo).Pod.Name < b.(framework.PodInfo).Pod.Name
}))
t.Fatalf("Length of backoffQ is not expected. Got %v, want %v.\n%s", len(rawPodInfos), len(test.wantBackoffQPodNames), diff)
t.Fatalf("Length of backoffQ is not expected. Got %v, want %v.\n%s", len(podInfos), len(test.wantBackoffQPodNames), diff)
}
wantPodNames := sets.New(test.wantBackoffQPodNames...)
for _, rawPodInfo := range rawPodInfos {
podGotFromBackoffQ := rawPodInfo.(*framework.QueuedPodInfo).Pod
for _, podInfo := range podInfos {
podGotFromBackoffQ := podInfo.Pod
if !wantPodNames.Has(podGotFromBackoffQ.Name) {
t.Fatalf("Pod %v was not expected to be in the backoffQ.", podGotFromBackoffQ.Name)
}
@ -935,7 +925,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
// Since there was a move request at the same cycle as "oldCycle", these pods
// should be in the backoff queue.
for i := 1; i < totalNum; i++ {
if _, exists, _ := q.podBackoffQ.Get(newQueuedPodInfoForLookup(&expectedPods[i])); !exists {
if !q.podBackoffQ.Has(newQueuedPodInfoForLookup(&expectedPods[i])) {
t.Errorf("Expected %v to be added to podBackoffQ.", expectedPods[i].Name)
}
}
@ -1026,10 +1016,7 @@ func TestPriorityQueue_Update(t *testing.T) {
name: "When updating a pod that is already in activeQ, the pod should remain in activeQ after Update()",
wantQ: activeQ,
prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) {
err := q.Update(logger, nil, highPriorityPodInfo.Pod)
if err != nil {
t.Errorf("add pod %s error: %v", highPriorityPodInfo.Pod.Name, err)
}
q.Update(logger, nil, highPriorityPodInfo.Pod)
return highPriorityPodInfo.Pod, highPriorityPodInfo.Pod
},
schedulingHintsEnablement: []bool{false, true},
@ -1039,9 +1026,7 @@ func TestPriorityQueue_Update(t *testing.T) {
wantQ: backoffQ,
prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) {
podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod)
if err := q.podBackoffQ.Add(podInfo); err != nil {
t.Errorf("adding pod to backoff queue error: %v", err)
}
q.podBackoffQ.AddOrUpdate(podInfo)
return podInfo.Pod, podInfo.Pod
},
schedulingHintsEnablement: []bool{false, true},
@ -1088,10 +1073,7 @@ func TestPriorityQueue_Update(t *testing.T) {
prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) {
podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod)
// We need to once add this Pod to activeQ and Pop() it so that this Pod is registered correctly in inFlightPods.
err := q.activeQ.Add(podInfo)
if err != nil {
t.Errorf("unexpected error from activeQ.Add: %v", err)
}
q.activeQ.AddOrUpdate(podInfo)
if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
}
@ -1115,25 +1097,23 @@ func TestPriorityQueue_Update(t *testing.T) {
oldPod, newPod := tt.prepareFunc(t, logger, q)
if err := q.Update(logger, oldPod, newPod); err != nil {
t.Fatalf("unexpected error from Update: %v", err)
}
q.Update(logger, oldPod, newPod)
var pInfo *framework.QueuedPodInfo
// validate expected queue
if obj, exists, _ := q.podBackoffQ.Get(newQueuedPodInfoForLookup(newPod)); exists {
if pInfoFromBackoff, exists := q.podBackoffQ.Get(newQueuedPodInfoForLookup(newPod)); exists {
if tt.wantQ != backoffQ {
t.Errorf("expected pod %s not to be queued to backoffQ, but it was", newPod.Name)
}
pInfo = obj.(*framework.QueuedPodInfo)
pInfo = pInfoFromBackoff
}
if obj, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(newPod)); exists {
if pInfoFromActive, exists := q.activeQ.Get(newQueuedPodInfoForLookup(newPod)); exists {
if tt.wantQ != activeQ {
t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name)
}
pInfo = obj.(*framework.QueuedPodInfo)
pInfo = pInfoFromActive
}
if pInfoFromUnsched := q.unschedulablePods.get(newPod); pInfoFromUnsched != nil {
@ -1183,9 +1163,7 @@ func TestPriorityQueue_UpdateWhenInflight(t *testing.T) {
// test-pod is created and popped out from the queue
testPod := st.MakePod().Name("test-pod").Namespace("test-ns").UID("test-uid").Obj()
if err := q.Add(logger, testPod); err != nil {
t.Errorf("add failed: %v", err)
}
q.Add(logger, testPod)
if p, err := q.Pop(logger); err != nil || p.Pod != testPod {
t.Errorf("Expected: %v after Pop, but got: %v", testPod.Name, p.Pod.Name)
}
@ -1199,9 +1177,7 @@ func TestPriorityQueue_UpdateWhenInflight(t *testing.T) {
},
}
if err := q.Update(logger, testPod, updatedPod); err != nil {
t.Errorf("Error calling Update: %v", err)
}
q.Update(logger, testPod, updatedPod)
// test-pod got rejected by fakePlugin,
// but the update event that it just got may change this scheduling result,
// and hence we should put this pod to activeQ/backoffQ.
@ -1210,11 +1186,9 @@ func TestPriorityQueue_UpdateWhenInflight(t *testing.T) {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
var pInfo *framework.QueuedPodInfo
if obj, exists, _ := q.podBackoffQ.Get(newQueuedPodInfoForLookup(updatedPod)); !exists {
pInfo, exists := q.podBackoffQ.Get(newQueuedPodInfoForLookup(updatedPod))
if !exists {
t.Fatalf("expected pod %s to be queued to backoffQ, but it wasn't.", updatedPod.Name)
} else {
pInfo = obj.(*framework.QueuedPodInfo)
}
if diff := cmp.Diff(updatedPod, pInfo.PodInfo.Pod); diff != "" {
t.Errorf("Unexpected updated pod diff (-want, +got): %s", diff)
@ -1229,21 +1203,17 @@ func TestPriorityQueue_Delete(t *testing.T) {
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs)
q.Update(logger, highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod)
q.Add(logger, unschedulablePodInfo.Pod)
if err := q.Delete(highPriNominatedPodInfo.Pod); err != nil {
t.Errorf("delete failed: %v", err)
}
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)); !exists {
q.Delete(highPriNominatedPodInfo.Pod)
if !q.activeQ.Has(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)) {
t.Errorf("Expected %v to be in activeQ.", unschedulablePodInfo.Pod.Name)
}
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)); exists {
if q.activeQ.Has(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)) {
t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPodInfo.Pod.Name)
}
if len(q.nominator.nominatedPods) != 1 {
t.Errorf("Expected nominatedPods to have only 'unschedulablePodInfo': %v", q.nominator.nominatedPods)
}
if err := q.Delete(unschedulablePodInfo.Pod); err != nil {
t.Errorf("delete failed: %v", err)
}
q.Delete(unschedulablePodInfo.Pod)
if len(q.nominator.nominatedPods) != 0 {
t.Errorf("Expected nominatedPods to be empty: %v", q.nominator)
}
@ -1293,7 +1263,7 @@ func TestPriorityQueue_Activate(t *testing.T) {
// Prepare activeQ/unschedulablePods/podBackoffQ according to the table
for _, qPodInfo := range tt.qPodInfoInActiveQ {
q.activeQ.Add(qPodInfo)
q.activeQ.AddOrUpdate(qPodInfo)
}
for _, qPodInfo := range tt.qPodInfoInUnschedulablePods {
@ -1301,7 +1271,7 @@ func TestPriorityQueue_Activate(t *testing.T) {
}
for _, qPodInfo := range tt.qPodInfoInPodBackoffQ {
q.podBackoffQ.Add(qPodInfo)
q.podBackoffQ.AddOrUpdate(qPodInfo)
}
// Activate specific pod according to the table
@ -1314,7 +1284,7 @@ func TestPriorityQueue_Activate(t *testing.T) {
// Check if the specific pod exists in activeQ
for _, want := range tt.want {
if _, exist, _ := q.activeQ.Get(newQueuedPodInfoForLookup(want.PodInfo.Pod)); !exist {
if !q.activeQ.Has(newQueuedPodInfoForLookup(want.PodInfo.Pod)) {
t.Errorf("podInfo not exist in activeQ: want %v", want.PodInfo.Pod.Name)
}
}
@ -1393,7 +1363,7 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) {
m := map[string][]framework.PreEnqueuePlugin{"": tt.plugins}
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m),
WithPodInitialBackoffDuration(time.Second*30), WithPodMaxBackoffDuration(time.Second*60))
got, _ := q.moveToActiveQ(logger, q.newQueuedPodInfo(tt.pod), framework.PodAdd)
got := q.moveToActiveQ(logger, q.newQueuedPodInfo(tt.pod), framework.PodAdd)
if got != tt.wantSuccess {
t.Errorf("Unexpected result: want %v, but got %v", tt.wantSuccess, got)
}
@ -1600,7 +1570,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing.
}
cl := testingclock.NewFakeClock(now)
q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(cl))
q.activeQ.Add(q.newQueuedPodInfo(test.podInfo.Pod))
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(test.podInfo.Pod))
if p, err := q.Pop(logger); err != nil || p.Pod != test.podInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", test.podInfo.Pod.Name, p.Pod.Name)
}
@ -1644,12 +1614,12 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
}
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m))
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
}
expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID)
q.activeQ.Add(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
}
@ -1665,7 +1635,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
expectInFlightPods(t, q)
// Construct a Pod, but don't associate its scheduler failure to any plugin
hpp1 := clonePod(highPriorityPodInfo.Pod, "hpp1")
q.activeQ.Add(q.newQueuedPodInfo(hpp1))
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(hpp1))
if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 {
t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name)
}
@ -1678,7 +1648,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
expectInFlightPods(t, q)
// Construct another Pod, and associate its scheduler failure to plugin "barPlugin".
hpp2 := clonePod(highPriorityPodInfo.Pod, "hpp2")
q.activeQ.Add(q.newQueuedPodInfo(hpp2))
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(hpp2))
if p, err := q.Pop(logger); err != nil || p.Pod != hpp2 {
t.Errorf("Expected: %v after Pop, but got: %v", hpp2, p.Pod.Name)
}
@ -1714,17 +1684,17 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID)
q.schedulingCycle++
q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
}
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID)
q.activeQ.Add(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
}
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID)
q.activeQ.Add(q.newQueuedPodInfo(hpp1))
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(hpp1))
if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 {
t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name)
}
@ -1755,7 +1725,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
t.Errorf("Expected %v in the unschedulablePods", pod.Name)
}
}
if _, ok, _ := q.podBackoffQ.Get(hpp1QueuedPodInfo); !ok {
if !q.podBackoffQ.Has(hpp1QueuedPodInfo) {
t.Errorf("Expected %v in the podBackoffQ", hpp1.Name)
}
@ -1792,9 +1762,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi
}
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m))
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
if err := q.Add(logger, medPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err)
}
q.Add(logger, medPriorityPodInfo.Pod)
err := q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
if err != nil {
@ -1858,9 +1826,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
if err = q.Add(logger, medPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err)
}
q.Add(logger, medPriorityPodInfo.Pod)
// hpp1 will go to backoffQ because no failure plugin is associated with it.
// All plugins other than hpp1 are enqueued to the unschedulable Pod pool.
for _, pod := range []*v1.Pod{unschedulablePodInfo.Pod, highPriorityPodInfo.Pod, hpp2} {
@ -1868,9 +1834,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi
t.Errorf("Expected %v in the unschedulablePods", pod.Name)
}
}
if _, ok, _ := q.podBackoffQ.Get(hpp1QueuedPodInfo); !ok {
t.Errorf("Expected %v in the podBackoffQ", hpp1.Name)
}
q.podBackoffQ.Get(hpp1QueuedPodInfo)
// Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off,
// and the pods will be moved into activeQ.
@ -1990,9 +1954,7 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) {
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m))
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
if err := q.activeQ.Add(q.newQueuedPodInfo(tt.unschedPod)); err != nil {
t.Errorf("failed to add pod to activeQ: %v", err)
}
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(tt.unschedPod))
if p, err := q.Pop(logger); err != nil || p.Pod != tt.unschedPod {
t.Errorf("Expected: %v after Pop, but got: %v", tt.unschedPod.Name, p.Pod.Name)
}
@ -2007,8 +1969,8 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) {
q.AssignedPodAdded(logger, tt.updatedAssignedPod)
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(tt.unschedPod)); exists != tt.wantToRequeue {
t.Fatalf("unexpected Pod move: Pod should be requeued: %v. Pod is actually requeued: %v", tt.wantToRequeue, exists)
if q.activeQ.Has(newQueuedPodInfoForLookup(tt.unschedPod)) != tt.wantToRequeue {
t.Fatalf("unexpected Pod move: Pod should be requeued: %v. Pod is actually requeued: %v", tt.wantToRequeue, !tt.wantToRequeue)
}
})
}
@ -2107,11 +2069,11 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort())
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
}
q.activeQ.Add(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
}
@ -2150,9 +2112,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
defer cancel()
objs := []runtime.Object{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod, scheduledPodInfo.Pod}
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs)
if err := q.Add(logger, medPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err)
}
q.Add(logger, medPriorityPodInfo.Pod)
// Update unschedulablePodInfo on a different node than specified in the pod.
q.AddNominatedPod(logger, mustNewTestPodInfo(t, unschedulablePodInfo.Pod),
&framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "node5"})
@ -2459,7 +2419,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
})
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below.
q.activeQ.Add(q.newQueuedPodInfo(unschedulablePod))
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePod))
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Pod.Name)
}
@ -2597,11 +2557,11 @@ func TestHighPriorityFlushUnschedulablePodsLeftover(t *testing.T) {
})
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
q.activeQ.Add(q.newQueuedPodInfo(highPod))
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(highPod))
if p, err := q.Pop(logger); err != nil || p.Pod != highPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPod.Name, p.Pod.Name)
}
q.activeQ.Add(q.newQueuedPodInfo(midPod))
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(midPod))
if p, err := q.Pop(logger); err != nil || p.Pod != midPod {
t.Errorf("Expected: %v after Pop, but got: %v", midPod.Name, p.Pod.Name)
}
@ -2698,7 +2658,7 @@ func TestPriorityQueue_initPodMaxInUnschedulablePodsDuration(t *testing.T) {
if pInfo, err := queue.activeQ.Pop(); err != nil {
t.Errorf("Error while popping the head of the queue: %v", err)
} else {
podInfoList = append(podInfoList, pInfo.(*framework.QueuedPodInfo))
podInfoList = append(podInfoList, pInfo)
}
}
@ -2713,17 +2673,13 @@ type operation func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInf
var (
add = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
if err := queue.Add(logger, pInfo.Pod); err != nil {
t.Fatalf("Unexpected error during Add: %v", err)
}
queue.Add(logger, pInfo.Pod)
}
popAndRequeueAsUnschedulable = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below.
// UnschedulablePlugins will get cleared by Pop, so make a copy first.
unschedulablePlugins := pInfo.UnschedulablePlugins.Clone()
if err := queue.activeQ.Add(queue.newQueuedPodInfo(pInfo.Pod)); err != nil {
t.Fatalf("Unexpected error during Add: %v", err)
}
queue.activeQ.AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod))
p, err := queue.Pop(logger)
if err != nil {
t.Fatalf("Unexpected error during Pop: %v", err)
@ -2739,9 +2695,7 @@ var (
}
popAndRequeueAsBackoff = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below.
if err := queue.activeQ.Add(queue.newQueuedPodInfo(pInfo.Pod)); err != nil {
t.Fatalf("Unexpected error during Add: %v", err)
}
queue.activeQ.AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod))
p, err := queue.Pop(logger)
if err != nil {
t.Fatalf("Unexpected error during Pop: %v", err)
@ -2755,10 +2709,7 @@ var (
}
}
addPodActiveQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
queue.activeQ.Add(pInfo)
}
updatePodActiveQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
queue.activeQ.Update(pInfo)
queue.activeQ.AddOrUpdate(pInfo)
}
addPodUnschedulablePods = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
if !pInfo.Gated {
@ -2781,7 +2732,7 @@ var (
queue.Update(logger, pInfo.Pod, newPod)
}
addPodBackoffQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
queue.podBackoffQ.Add(pInfo)
queue.podBackoffQ.AddOrUpdate(pInfo)
}
moveAllToActiveOrBackoffQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) {
queue.MoveAllToActiveOrBackoffQueue(logger, framework.UnschedulableTimeout, nil, nil, nil)
@ -2829,15 +2780,6 @@ func TestPodTimestamp(t *testing.T) {
operands: []*framework.QueuedPodInfo{pInfo2, pInfo1},
expected: []*framework.QueuedPodInfo{pInfo1, pInfo2},
},
{
name: "update two pod to activeQ and sort them by the timestamp",
operations: []operation{
updatePodActiveQ,
updatePodActiveQ,
},
operands: []*framework.QueuedPodInfo{pInfo2, pInfo1},
expected: []*framework.QueuedPodInfo{pInfo1, pInfo2},
},
{
name: "add two pod to unschedulablePods then move them to activeQ and sort them by the timestamp",
operations: []operation{
@ -2883,7 +2825,7 @@ func TestPodTimestamp(t *testing.T) {
if pInfo, err := queue.activeQ.Pop(); err != nil {
t.Errorf("Error while popping the head of the queue: %v", err)
} else {
podInfoList = append(podInfoList, pInfo.(*framework.QueuedPodInfo))
podInfoList = append(podInfoList, pInfo)
}
}
@ -3398,9 +3340,7 @@ func TestBackOffFlow(t *testing.T) {
Namespace: pod.Namespace,
Name: pod.Name,
}
if err := q.Add(logger, pod); err != nil {
t.Fatal(err)
}
q.Add(logger, pod)
for i, step := range steps {
t.Run(fmt.Sprintf("step %d", i), func(t *testing.T) {
@ -3421,7 +3361,7 @@ func TestBackOffFlow(t *testing.T) {
// An event happens.
q.MoveAllToActiveOrBackoffQueue(logger, framework.UnschedulableTimeout, nil, nil, nil)
if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok {
if !q.podBackoffQ.Has(podInfo) {
t.Errorf("pod %v is not in the backoff queue", podID)
}
@ -3436,13 +3376,13 @@ func TestBackOffFlow(t *testing.T) {
cl.Step(time.Millisecond)
q.flushBackoffQCompleted(logger)
// Still in backoff queue after an early flush.
if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok {
if !q.podBackoffQ.Has(podInfo) {
t.Errorf("pod %v is not in the backoff queue", podID)
}
// Moved out of the backoff queue after timeout.
cl.Step(backoff)
q.flushBackoffQCompleted(logger)
if _, ok, _ := q.podBackoffQ.Get(podInfo); ok {
if q.podBackoffQ.Has(podInfo) {
t.Errorf("pod %v is still in the backoff queue", podID)
}
})
@ -3513,7 +3453,7 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
q := NewTestQueue(ctx, newDefaultQueueSort())
for i, podInfo := range tt.podInfos {
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below.
q.activeQ.Add(q.newQueuedPodInfo(podInfo.Pod))
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(podInfo.Pod))
if p, err := q.Pop(logger); err != nil || p.Pod != podInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", podInfo.Pod.Name, p.Pod.Name)
}
@ -3532,14 +3472,10 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
q.MoveAllToActiveOrBackoffQueue(logger, tt.event, nil, nil, tt.preEnqueueCheck)
var got []string
for q.podBackoffQ.Len() != 0 {
obj, err := q.podBackoffQ.Pop()
queuedPodInfo, err := q.podBackoffQ.Pop()
if err != nil {
t.Fatalf("Fail to pop pod from backoffQ: %v", err)
}
queuedPodInfo, ok := obj.(*framework.QueuedPodInfo)
if !ok {
t.Fatalf("Fail to convert popped obj (type %T) to *framework.QueuedPodInfo", obj)
}
got = append(got, queuedPodInfo.Pod.Name)
}
if diff := cmp.Diff(tt.want, got); diff != "" {
@ -3912,9 +3848,7 @@ func Test_queuedPodInfo_gatedSetUponCreationAndUnsetUponUpdate(t *testing.T) {
q := NewTestQueue(ctx, newDefaultQueueSort(), WithPreEnqueuePluginMap(m))
gatedPod := st.MakePod().SchedulingGates([]string{"hello world"}).Obj()
if err := q.Add(logger, gatedPod); err != nil {
t.Errorf("Error while adding gated pod: %v", err)
}
q.Add(logger, gatedPod)
if !q.unschedulablePods.get(gatedPod).Gated {
t.Error("Expected pod to be gated")
@ -3922,9 +3856,7 @@ func Test_queuedPodInfo_gatedSetUponCreationAndUnsetUponUpdate(t *testing.T) {
ungatedPod := gatedPod.DeepCopy()
ungatedPod.Spec.SchedulingGates = nil
if err := q.Update(logger, gatedPod, ungatedPod); err != nil {
t.Errorf("Error while updating pod to ungated: %v", err)
}
q.Update(logger, gatedPod, ungatedPod)
ungatedPodInfo, _ := q.Pop(logger)
if ungatedPodInfo.Gated {

View File

@ -287,9 +287,7 @@ func TestFailureHandler(t *testing.T) {
queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now())))
schedulerCache := internalcache.New(ctx, 30*time.Second)
if err := queue.Add(logger, testPod); err != nil {
t.Fatalf("Add failed: %v", err)
}
queue.Add(logger, testPod)
if _, err := queue.Pop(logger); err != nil {
t.Fatalf("Pop failed: %v", err)

View File

@ -314,13 +314,7 @@ func TestCoreResourceEnqueue(t *testing.T) {
newPod = oldPod.DeepCopy()
newPod.Status.Conditions[0].Message = "injected message"
if err := testCtx.Scheduler.SchedulingQueue.Update(
klog.FromContext(testCtx.Ctx),
oldPod,
newPod,
); err != nil {
return fmt.Errorf("failed to update the pod: %w", err)
}
testCtx.Scheduler.SchedulingQueue.Update(klog.FromContext(testCtx.Ctx), oldPod, newPod)
return nil
},
wantRequeuedPods: sets.Set[string]{},