diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index efda92053fe..a68db516b68 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -147,9 +147,7 @@ func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) { logger := sched.logger pod := obj.(*v1.Pod) logger.V(3).Info("Add event for unscheduled pod", "pod", klog.KObj(pod)) - if err := sched.SchedulingQueue.Add(logger, pod); err != nil { - utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err)) - } + sched.SchedulingQueue.Add(logger, pod) } func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) { @@ -172,9 +170,7 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) { } logger.V(4).Info("Update event for unscheduled pod", "pod", klog.KObj(newPod)) - if err := sched.SchedulingQueue.Update(logger, oldPod, newPod); err != nil { - utilruntime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err)) - } + sched.SchedulingQueue.Update(logger, oldPod, newPod) } func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) { @@ -199,9 +195,7 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) { } logger.V(3).Info("Delete event for unscheduled pod", "pod", klog.KObj(pod)) - if err := sched.SchedulingQueue.Delete(pod); err != nil { - utilruntime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err)) - } + sched.SchedulingQueue.Delete(pod) fwk, err := sched.frameworkForPod(pod) if err != nil { // This shouldn't happen, because we only accept for scheduling the pods diff --git a/pkg/scheduler/internal/heap/heap.go b/pkg/scheduler/internal/heap/heap.go index 8bc387aaffc..df72db1e13e 100644 --- a/pkg/scheduler/internal/heap/heap.go +++ b/pkg/scheduler/internal/heap/heap.go @@ -24,29 +24,28 @@ import ( "container/heap" "fmt" - "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/scheduler/metrics" ) // KeyFunc is a function type to get the key from an object. -type KeyFunc func(obj interface{}) (string, error) +type KeyFunc[T any] func(obj T) string -type heapItem struct { - obj interface{} // The object which is stored in the heap. - index int // The index of the object's key in the Heap.queue. +type heapItem[T any] struct { + obj T // The object which is stored in the heap. + index int // The index of the object's key in the Heap.queue. } -type itemKeyValue struct { +type itemKeyValue[T any] struct { key string - obj interface{} + obj T } // data is an internal struct that implements the standard heap interface // and keeps the data stored in the heap. -type data struct { +type data[T any] struct { // items is a map from key of the objects to the objects and their index. // We depend on the property that items in the map are in the queue and vice versa. - items map[string]*heapItem + items map[string]*heapItem[T] // queue implements a heap data structure and keeps the order of elements // according to the heap invariant. The queue keeps the keys of objects stored // in "items". @@ -54,18 +53,18 @@ type data struct { // keyFunc is used to make the key used for queued item insertion and retrieval, and // should be deterministic. - keyFunc KeyFunc + keyFunc KeyFunc[T] // lessFunc is used to compare two objects in the heap. - lessFunc lessFunc + lessFunc LessFunc[T] } var ( - _ = heap.Interface(&data{}) // heapData is a standard heap + _ = heap.Interface(&data[any]{}) // heapData is a standard heap ) // Less compares two objects and returns true if the first one should go // in front of the second one in the heap. -func (h *data) Less(i, j int) bool { +func (h *data[T]) Less(i, j int) bool { if i > len(h.queue) || j > len(h.queue) { return false } @@ -81,11 +80,11 @@ func (h *data) Less(i, j int) bool { } // Len returns the number of items in the Heap. -func (h *data) Len() int { return len(h.queue) } +func (h *data[T]) Len() int { return len(h.queue) } // Swap implements swapping of two elements in the heap. This is a part of standard // heap interface and should never be called directly. -func (h *data) Swap(i, j int) { +func (h *data[T]) Swap(i, j int) { h.queue[i], h.queue[j] = h.queue[j], h.queue[i] item := h.items[h.queue[i]] item.index = i @@ -93,16 +92,16 @@ func (h *data) Swap(i, j int) { item.index = j } -// Push is supposed to be called by heap.Push only. -func (h *data) Push(kv interface{}) { - keyValue := kv.(*itemKeyValue) +// Push is supposed to be called by container/heap.Push only. +func (h *data[T]) Push(kv interface{}) { + keyValue := kv.(*itemKeyValue[T]) n := len(h.queue) - h.items[keyValue.key] = &heapItem{keyValue.obj, n} + h.items[keyValue.key] = &heapItem[T]{keyValue.obj, n} h.queue = append(h.queue, keyValue.key) } -// Pop is supposed to be called by heap.Pop only. -func (h *data) Pop() interface{} { +// Pop is supposed to be called by container/heap.Pop only. +func (h *data[T]) Pop() interface{} { key := h.queue[len(h.queue)-1] h.queue = h.queue[0 : len(h.queue)-1] item, ok := h.items[key] @@ -114,56 +113,44 @@ func (h *data) Pop() interface{} { return item.obj } -// Peek is supposed to be called by heap.Peek only. -func (h *data) Peek() interface{} { +// Peek returns the head of the heap without removing it. +func (h *data[T]) Peek() (T, bool) { if len(h.queue) > 0 { - return h.items[h.queue[0]].obj + return h.items[h.queue[0]].obj, true } - return nil + var zero T + return zero, false } // Heap is a producer/consumer queue that implements a heap data structure. // It can be used to implement priority queues and similar data structures. -type Heap struct { +type Heap[T any] struct { // data stores objects and has a queue that keeps their ordering according // to the heap invariant. - data *data + data *data[T] // metricRecorder updates the counter when elements of a heap get added or // removed, and it does nothing if it's nil metricRecorder metrics.MetricRecorder } -// Add inserts an item, and puts it in the queue. The item is updated if it +// AddOrUpdate inserts an item, and puts it in the queue. The item is updated if it // already exists. -func (h *Heap) Add(obj interface{}) error { - key, err := h.data.keyFunc(obj) - if err != nil { - return cache.KeyError{Obj: obj, Err: err} - } +func (h *Heap[T]) AddOrUpdate(obj T) { + key := h.data.keyFunc(obj) if _, exists := h.data.items[key]; exists { h.data.items[key].obj = obj heap.Fix(h.data, h.data.items[key].index) } else { - heap.Push(h.data, &itemKeyValue{key, obj}) + heap.Push(h.data, &itemKeyValue[T]{key, obj}) if h.metricRecorder != nil { h.metricRecorder.Inc() } } - return nil -} - -// Update is the same as Add in this implementation. When the item does not -// exist, it is added. -func (h *Heap) Update(obj interface{}) error { - return h.Add(obj) } // Delete removes an item. -func (h *Heap) Delete(obj interface{}) error { - key, err := h.data.keyFunc(obj) - if err != nil { - return cache.KeyError{Obj: obj, Err: err} - } +func (h *Heap[T]) Delete(obj T) error { + key := h.data.keyFunc(obj) if item, ok := h.data.items[key]; ok { heap.Remove(h.data, item.index) if h.metricRecorder != nil { @@ -175,43 +162,48 @@ func (h *Heap) Delete(obj interface{}) error { } // Peek returns the head of the heap without removing it. -func (h *Heap) Peek() interface{} { +func (h *Heap[T]) Peek() (T, bool) { return h.data.Peek() } // Pop returns the head of the heap and removes it. -func (h *Heap) Pop() (interface{}, error) { +func (h *Heap[T]) Pop() (T, error) { obj := heap.Pop(h.data) if obj != nil { if h.metricRecorder != nil { h.metricRecorder.Dec() } - return obj, nil + return obj.(T), nil } - return nil, fmt.Errorf("object was removed from heap data") + var zero T + return zero, fmt.Errorf("heap is empty") } // Get returns the requested item, or sets exists=false. -func (h *Heap) Get(obj interface{}) (interface{}, bool, error) { - key, err := h.data.keyFunc(obj) - if err != nil { - return nil, false, cache.KeyError{Obj: obj, Err: err} - } +func (h *Heap[T]) Get(obj T) (T, bool) { + key := h.data.keyFunc(obj) return h.GetByKey(key) } // GetByKey returns the requested item, or sets exists=false. -func (h *Heap) GetByKey(key string) (interface{}, bool, error) { +func (h *Heap[T]) GetByKey(key string) (T, bool) { item, exists := h.data.items[key] if !exists { - return nil, false, nil + var zero T + return zero, false } - return item.obj, true, nil + return item.obj, true +} + +func (h *Heap[T]) Has(obj T) bool { + key := h.data.keyFunc(obj) + _, ok := h.GetByKey(key) + return ok } // List returns a list of all the items. -func (h *Heap) List() []interface{} { - list := make([]interface{}, 0, len(h.data.items)) +func (h *Heap[T]) List() []T { + list := make([]T, 0, len(h.data.items)) for _, item := range h.data.items { list = append(list, item.obj) } @@ -219,20 +211,20 @@ func (h *Heap) List() []interface{} { } // Len returns the number of items in the heap. -func (h *Heap) Len() int { +func (h *Heap[T]) Len() int { return len(h.data.queue) } // New returns a Heap which can be used to queue up items to process. -func New(keyFn KeyFunc, lessFn lessFunc) *Heap { +func New[T any](keyFn KeyFunc[T], lessFn LessFunc[T]) *Heap[T] { return NewWithRecorder(keyFn, lessFn, nil) } // NewWithRecorder wraps an optional metricRecorder to compose a Heap object. -func NewWithRecorder(keyFn KeyFunc, lessFn lessFunc, metricRecorder metrics.MetricRecorder) *Heap { - return &Heap{ - data: &data{ - items: map[string]*heapItem{}, +func NewWithRecorder[T any](keyFn KeyFunc[T], lessFn LessFunc[T], metricRecorder metrics.MetricRecorder) *Heap[T] { + return &Heap[T]{ + data: &data[T]{ + items: map[string]*heapItem[T]{}, queue: []string{}, keyFunc: keyFn, lessFunc: lessFn, @@ -241,6 +233,6 @@ func NewWithRecorder(keyFn KeyFunc, lessFn lessFunc, metricRecorder metrics.Metr } } -// lessFunc is a function that receives two items and returns true if the first +// LessFunc is a function that receives two items and returns true if the first // item should be placed before the second one when the list is sorted. -type lessFunc = func(item1, item2 interface{}) bool +type LessFunc[T any] func(item1, item2 T) bool diff --git a/pkg/scheduler/internal/heap/heap_test.go b/pkg/scheduler/internal/heap/heap_test.go index 7b853efc27d..35e010747d1 100644 --- a/pkg/scheduler/internal/heap/heap_test.go +++ b/pkg/scheduler/internal/heap/heap_test.go @@ -23,8 +23,8 @@ import ( "testing" ) -func testHeapObjectKeyFunc(obj interface{}) (string, error) { - return obj.(testHeapObject).name, nil +func testHeapObjectKeyFunc(obj testHeapObject) string { + return obj.name } type testHeapObject struct { @@ -56,9 +56,9 @@ func mkHeapObj(name string, val interface{}) testHeapObject { return testHeapObject{name: name, val: val} } -func compareInts(val1 interface{}, val2 interface{}) bool { - first := val1.(testHeapObject).val.(int) - second := val2.(testHeapObject).val.(int) +func compareInts(val1 testHeapObject, val2 testHeapObject) bool { + first := val1.val.(int) + second := val2.val.(int) return first < second } @@ -67,17 +67,18 @@ func TestHeapBasic(t *testing.T) { h := New(testHeapObjectKeyFunc, compareInts) const amount = 500 var i int + var zero testHeapObject // empty queue - if item := h.Peek(); item != nil { + if item, ok := h.Peek(); ok || item != zero { t.Errorf("expected nil object but got %v", item) } for i = amount; i > 0; i-- { - h.Add(mkHeapObj(string([]rune{'a', rune(i)}), i)) + h.AddOrUpdate(mkHeapObj(string([]rune{'a', rune(i)}), i)) // Retrieve head without removing it - head := h.Peek() - if e, a := i, head.(testHeapObject).val; a != e { + head, ok := h.Peek() + if e, a := i, head.val; !ok || a != e { t.Errorf("expected %d, got %d", e, a) } } @@ -85,41 +86,44 @@ func TestHeapBasic(t *testing.T) { // Make sure that the numbers are popped in ascending order. prevNum := 0 for i := 0; i < amount; i++ { - obj, err := h.Pop() - num := obj.(testHeapObject).val.(int) + item, err := h.Pop() + num := item.val.(int) // All the items must be sorted. if err != nil || prevNum > num { - t.Errorf("got %v out of order, last was %v", obj, prevNum) + t.Errorf("got %v out of order, last was %v", item, prevNum) } prevNum = num } } -// Tests Heap.Add and ensures that heap invariant is preserved after adding items. -func TestHeap_Add(t *testing.T) { +// TestHeap_AddOrUpdate_Add tests add capabilities of Heap.AddOrUpdate +// and ensures that heap invariant is preserved after adding items. +func TestHeap_AddOrUpdate_Add(t *testing.T) { h := New(testHeapObjectKeyFunc, compareInts) - h.Add(mkHeapObj("foo", 10)) - h.Add(mkHeapObj("bar", 1)) - h.Add(mkHeapObj("baz", 11)) - h.Add(mkHeapObj("zab", 30)) - h.Add(mkHeapObj("foo", 13)) // This updates "foo". + h.AddOrUpdate(mkHeapObj("foo", 10)) + h.AddOrUpdate(mkHeapObj("bar", 1)) + h.AddOrUpdate(mkHeapObj("baz", 11)) + h.AddOrUpdate(mkHeapObj("zab", 30)) + h.AddOrUpdate(mkHeapObj("foo", 13)) // This updates "foo". item, err := h.Pop() - if e, a := 1, item.(testHeapObject).val; err != nil || a != e { + if e, a := 1, item.val; err != nil || a != e { t.Fatalf("expected %d, got %d", e, a) } item, err = h.Pop() - if e, a := 11, item.(testHeapObject).val; err != nil || a != e { + if e, a := 11, item.val; err != nil || a != e { t.Fatalf("expected %d, got %d", e, a) } - h.Delete(mkHeapObj("baz", 11)) // Nothing is deleted. - h.Add(mkHeapObj("foo", 14)) // foo is updated. + if err := h.Delete(mkHeapObj("baz", 11)); err == nil { // Nothing is deleted. + t.Fatalf("nothing should be deleted from the heap") + } + h.AddOrUpdate(mkHeapObj("foo", 14)) // foo is updated. item, err = h.Pop() - if e, a := 14, item.(testHeapObject).val; err != nil || a != e { + if e, a := 14, item.val; err != nil || a != e { t.Fatalf("expected %d, got %d", e, a) } item, err = h.Pop() - if e, a := 30, item.(testHeapObject).val; err != nil || a != e { + if e, a := 30, item.val; err != nil || a != e { t.Fatalf("expected %d, got %d", e, a) } } @@ -128,21 +132,21 @@ func TestHeap_Add(t *testing.T) { // preserved after deleting items. func TestHeap_Delete(t *testing.T) { h := New(testHeapObjectKeyFunc, compareInts) - h.Add(mkHeapObj("foo", 10)) - h.Add(mkHeapObj("bar", 1)) - h.Add(mkHeapObj("bal", 31)) - h.Add(mkHeapObj("baz", 11)) + h.AddOrUpdate(mkHeapObj("foo", 10)) + h.AddOrUpdate(mkHeapObj("bar", 1)) + h.AddOrUpdate(mkHeapObj("bal", 31)) + h.AddOrUpdate(mkHeapObj("baz", 11)) // Delete head. Delete should work with "key" and doesn't care about the value. if err := h.Delete(mkHeapObj("bar", 200)); err != nil { t.Fatalf("Failed to delete head.") } item, err := h.Pop() - if e, a := 10, item.(testHeapObject).val; err != nil || a != e { + if e, a := 10, item.val; err != nil || a != e { t.Fatalf("expected %d, got %d", e, a) } - h.Add(mkHeapObj("zab", 30)) - h.Add(mkHeapObj("faz", 30)) + h.AddOrUpdate(mkHeapObj("zab", 30)) + h.AddOrUpdate(mkHeapObj("faz", 30)) len := h.data.Len() // Delete non-existing item. if err = h.Delete(mkHeapObj("non-existent", 10)); err == nil || len != h.data.Len() { @@ -157,11 +161,11 @@ func TestHeap_Delete(t *testing.T) { t.Fatalf("Failed to delete item.") } item, err = h.Pop() - if e, a := 11, item.(testHeapObject).val; err != nil || a != e { + if e, a := 11, item.val; err != nil || a != e { t.Fatalf("expected %d, got %d", e, a) } item, err = h.Pop() - if e, a := 30, item.(testHeapObject).val; err != nil || a != e { + if e, a := 30, item.val; err != nil || a != e { t.Fatalf("expected %d, got %d", e, a) } if h.data.Len() != 0 { @@ -169,26 +173,26 @@ func TestHeap_Delete(t *testing.T) { } } -// TestHeap_Update tests Heap.Update and ensures that heap invariant is -// preserved after adding items. -func TestHeap_Update(t *testing.T) { +// TestHeap_AddOrUpdate_Update tests update capabilities of Heap.Update +// and ensures that heap invariant is preserved after adding items. +func TestHeap_AddOrUpdate_Update(t *testing.T) { h := New(testHeapObjectKeyFunc, compareInts) - h.Add(mkHeapObj("foo", 10)) - h.Add(mkHeapObj("bar", 1)) - h.Add(mkHeapObj("bal", 31)) - h.Add(mkHeapObj("baz", 11)) + h.AddOrUpdate(mkHeapObj("foo", 10)) + h.AddOrUpdate(mkHeapObj("bar", 1)) + h.AddOrUpdate(mkHeapObj("bal", 31)) + h.AddOrUpdate(mkHeapObj("baz", 11)) // Update an item to a value that should push it to the head. - h.Update(mkHeapObj("baz", 0)) + h.AddOrUpdate(mkHeapObj("baz", 0)) if h.data.queue[0] != "baz" || h.data.items["baz"].index != 0 { t.Fatalf("expected baz to be at the head") } item, err := h.Pop() - if e, a := 0, item.(testHeapObject).val; err != nil || a != e { + if e, a := 0, item.val; err != nil || a != e { t.Fatalf("expected %d, got %d", e, a) } // Update bar to push it farther back in the queue. - h.Update(mkHeapObj("bar", 100)) + h.AddOrUpdate(mkHeapObj("bar", 100)) if h.data.queue[0] != "foo" || h.data.items["foo"].index != 0 { t.Fatalf("expected foo to be at the head") } @@ -197,19 +201,19 @@ func TestHeap_Update(t *testing.T) { // TestHeap_Get tests Heap.Get. func TestHeap_Get(t *testing.T) { h := New(testHeapObjectKeyFunc, compareInts) - h.Add(mkHeapObj("foo", 10)) - h.Add(mkHeapObj("bar", 1)) - h.Add(mkHeapObj("bal", 31)) - h.Add(mkHeapObj("baz", 11)) + h.AddOrUpdate(mkHeapObj("foo", 10)) + h.AddOrUpdate(mkHeapObj("bar", 1)) + h.AddOrUpdate(mkHeapObj("bal", 31)) + h.AddOrUpdate(mkHeapObj("baz", 11)) // Get works with the key. - obj, exists, err := h.Get(mkHeapObj("baz", 0)) - if err != nil || exists == false || obj.(testHeapObject).val != 11 { + item, exists := h.Get(mkHeapObj("baz", 0)) + if !exists || item.val != 11 { t.Fatalf("unexpected error in getting element") } // Get non-existing object. - _, exists, err = h.Get(mkHeapObj("non-existing", 0)) - if err != nil || exists { + _, exists = h.Get(mkHeapObj("non-existing", 0)) + if exists { t.Fatalf("didn't expect to get any object") } } @@ -217,18 +221,18 @@ func TestHeap_Get(t *testing.T) { // TestHeap_GetByKey tests Heap.GetByKey and is very similar to TestHeap_Get. func TestHeap_GetByKey(t *testing.T) { h := New(testHeapObjectKeyFunc, compareInts) - h.Add(mkHeapObj("foo", 10)) - h.Add(mkHeapObj("bar", 1)) - h.Add(mkHeapObj("bal", 31)) - h.Add(mkHeapObj("baz", 11)) + h.AddOrUpdate(mkHeapObj("foo", 10)) + h.AddOrUpdate(mkHeapObj("bar", 1)) + h.AddOrUpdate(mkHeapObj("bal", 31)) + h.AddOrUpdate(mkHeapObj("baz", 11)) - obj, exists, err := h.GetByKey("baz") - if err != nil || !exists || obj.(testHeapObject).val != 11 { + item, exists := h.GetByKey("baz") + if !exists || item.val != 11 { t.Fatalf("unexpected error in getting element") } // Get non-existing object. - _, exists, err = h.GetByKey("non-existing") - if err != nil || exists { + _, exists = h.GetByKey("non-existing") + if exists { t.Fatalf("didn't expect to get any object") } } @@ -249,14 +253,13 @@ func TestHeap_List(t *testing.T) { "faz": 30, } for k, v := range items { - h.Add(mkHeapObj(k, v)) + h.AddOrUpdate(mkHeapObj(k, v)) } list = h.List() if len(list) != len(items) { t.Errorf("expected %d items, got %d", len(items), len(list)) } - for _, obj := range list { - heapObj := obj.(testHeapObject) + for _, heapObj := range list { v, ok := items[heapObj.name] if !ok || v != heapObj.val { t.Errorf("unexpected item in the list: %v", heapObj) @@ -267,10 +270,10 @@ func TestHeap_List(t *testing.T) { func TestHeapWithRecorder(t *testing.T) { metricRecorder := new(testMetricRecorder) h := NewWithRecorder(testHeapObjectKeyFunc, compareInts, metricRecorder) - h.Add(mkHeapObj("foo", 10)) - h.Add(mkHeapObj("bar", 1)) - h.Add(mkHeapObj("baz", 100)) - h.Add(mkHeapObj("qux", 11)) + h.AddOrUpdate(mkHeapObj("foo", 10)) + h.AddOrUpdate(mkHeapObj("bar", 1)) + h.AddOrUpdate(mkHeapObj("baz", 100)) + h.AddOrUpdate(mkHeapObj("qux", 11)) if *metricRecorder != 4 { t.Errorf("expected count to be 4 but got %d", *metricRecorder) diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index daf9164553c..e4b6f895d77 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -93,7 +93,7 @@ type PreEnqueueCheck func(pod *v1.Pod) bool // makes it easy to use those data structures as a SchedulingQueue. type SchedulingQueue interface { framework.PodNominator - Add(logger klog.Logger, pod *v1.Pod) error + Add(logger klog.Logger, pod *v1.Pod) // Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ. // The passed-in pods are originally compiled from plugins that want to activate Pods, // by injecting the pods through a reserved CycleState struct (PodsToActivate). @@ -112,8 +112,8 @@ type SchedulingQueue interface { // Done must be called for pod returned by Pop. This allows the queue to // keep track of which pods are currently being processed. Done(types.UID) - Update(logger klog.Logger, oldPod, newPod *v1.Pod) error - Delete(pod *v1.Pod) error + Update(logger klog.Logger, oldPod, newPod *v1.Pod) + Delete(pod *v1.Pod) // TODO(sanposhiho): move all PreEnqueueCheck to Requeue and delete it from this parameter eventually. // Some PreEnqueueCheck include event filtering logic based on some in-tree plugins // and it affect badly to other plugins. @@ -212,10 +212,10 @@ type PriorityQueue struct { // activeQ is heap structure that scheduler actively looks at to find pods to // schedule. Head of heap is the highest priority pod. It should be protected by activeQLock. - activeQ *heap.Heap + activeQ *heap.Heap[*framework.QueuedPodInfo] // podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff // are popped from this heap before the scheduler looks at activeQ - podBackoffQ *heap.Heap + podBackoffQ *heap.Heap[*framework.QueuedPodInfo] // unschedulablePods holds pods that have been tried and determined unschedulable. unschedulablePods *UnschedulablePods // schedulingCycle represents sequence number of scheduling cycle and is incremented @@ -382,19 +382,13 @@ func NewPriorityQueue( opt(&options) } - comp := func(podInfo1, podInfo2 interface{}) bool { - pInfo1 := podInfo1.(*framework.QueuedPodInfo) - pInfo2 := podInfo2.(*framework.QueuedPodInfo) - return lessFn(pInfo1, pInfo2) - } - pq := &PriorityQueue{ clock: options.clock, stop: make(chan struct{}), podInitialBackoffDuration: options.podInitialBackoffDuration, podMaxBackoffDuration: options.podMaxBackoffDuration, podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration, - activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()), + activeQ: heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()), unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()), inFlightPods: make(map[types.UID]*list.Element), inFlightEvents: list.New(), @@ -603,7 +597,7 @@ func (p *PriorityQueue) runPreEnqueuePlugin(ctx context.Context, pl framework.Pr // It returns 2 parameters: // 1. a boolean flag to indicate whether the pod is added successfully. // 2. an error for the caller to act on. -func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) (bool, error) { +func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) bool { gatedBefore := pInfo.Gated pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo) @@ -611,24 +605,21 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue defer p.activeQLock.Unlock() if pInfo.Gated { // Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins. - if _, exists, _ := p.activeQ.Get(pInfo); exists { - return false, nil + if p.activeQ.Has(pInfo) { + return false } - if _, exists, _ := p.podBackoffQ.Get(pInfo); exists { - return false, nil + if p.podBackoffQ.Has(pInfo) { + return false } p.unschedulablePods.addOrUpdate(pInfo) - return false, nil + return false } if pInfo.InitialAttemptTimestamp == nil { now := p.clock.Now() pInfo.InitialAttemptTimestamp = &now } - if err := p.activeQ.Add(pInfo); err != nil { - logger.Error(err, "Error adding pod to the active queue", "pod", klog.KObj(pInfo.Pod)) - return false, err - } + p.activeQ.AddOrUpdate(pInfo) p.unschedulablePods.delete(pInfo.Pod, gatedBefore) _ = p.podBackoffQ.Delete(pInfo) // Don't need to react when pInfo is not found. @@ -638,22 +629,19 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue p.AddNominatedPod(logger, pInfo.PodInfo, nil) } - return true, nil + return true } // Add adds a pod to the active queue. It should be called only when a new pod // is added so there is no chance the pod is already in active/unschedulable/backoff queues -func (p *PriorityQueue) Add(logger klog.Logger, pod *v1.Pod) error { +func (p *PriorityQueue) Add(logger klog.Logger, pod *v1.Pod) { p.lock.Lock() defer p.lock.Unlock() pInfo := p.newQueuedPodInfo(pod) - if added, err := p.moveToActiveQ(logger, pInfo, framework.PodAdd); !added { - return err + if added := p.moveToActiveQ(logger, pInfo, framework.PodAdd); added { + p.cond.Broadcast() } - p.cond.Broadcast() - - return nil } // Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ. @@ -676,8 +664,7 @@ func (p *PriorityQueue) Activate(logger klog.Logger, pods map[string]*v1.Pod) { func (p *PriorityQueue) existsInActiveQ(pInfo *framework.QueuedPodInfo) bool { p.activeQLock.RLock() defer p.activeQLock.RUnlock() - _, exists, _ := p.activeQ.Get(pInfo) - return exists + return p.activeQ.Has(pInfo) } func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool { @@ -686,10 +673,10 @@ func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool { if pInfo = p.unschedulablePods.get(pod); pInfo == nil { // If the pod doesn't belong to unschedulablePods or backoffQ, don't activate it. // The pod can be already in activeQ. - if obj, exists, _ := p.podBackoffQ.Get(newQueuedPodInfoForLookup(pod)); !exists { + var exists bool + pInfo, exists = p.podBackoffQ.Get(newQueuedPodInfoForLookup(pod)) + if !exists { return false - } else { - pInfo = obj.(*framework.QueuedPodInfo) } } @@ -699,8 +686,7 @@ func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool { return false } - added, _ := p.moveToActiveQ(logger, pInfo, framework.ForceActivate) - return added + return p.moveToActiveQ(logger, pInfo, framework.ForceActivate) } // isPodBackingoff returns true if a pod is still waiting for its backoff timer. @@ -820,9 +806,7 @@ func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger, // - No unschedulable plugins are associated with this Pod, // meaning something unusual (a temporal failure on kube-apiserver, etc) happened and this Pod gets moved back to the queue. // In this case, we should retry scheduling it because this Pod may not be retried until the next flush. - if err := p.podBackoffQ.Add(pInfo); err != nil { - return fmt.Errorf("error adding pod %v to the backoff queue: %v", klog.KObj(pod), err) - } + p.podBackoffQ.AddOrUpdate(pInfo) logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", backoffQ) metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", framework.ScheduleAttemptFailure).Inc() } else { @@ -853,7 +837,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo * if p.existsInActiveQ(pInfo) { return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod)) } - if _, exists, _ := p.podBackoffQ.Get(pInfo); exists { + if p.podBackoffQ.Has(pInfo) { return fmt.Errorf("Pod %v is already present in the backoff queue", klog.KObj(pod)) } @@ -892,11 +876,10 @@ func (p *PriorityQueue) flushBackoffQCompleted(logger klog.Logger) { defer p.lock.Unlock() activated := false for { - rawPodInfo := p.podBackoffQ.Peek() - if rawPodInfo == nil { + pInfo, ok := p.podBackoffQ.Peek() + if !ok || pInfo == nil { break } - pInfo := rawPodInfo.(*framework.QueuedPodInfo) pod := pInfo.Pod if p.isPodBackingoff(pInfo) { break @@ -906,7 +889,7 @@ func (p *PriorityQueue) flushBackoffQCompleted(logger klog.Logger) { logger.Error(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod)) break } - if added, _ := p.moveToActiveQ(logger, pInfo, framework.BackoffComplete); added { + if added := p.moveToActiveQ(logger, pInfo, framework.BackoffComplete); added { activated = true } } @@ -954,11 +937,10 @@ func (p *PriorityQueue) Pop(logger klog.Logger) (*framework.QueuedPodInfo, error } p.cond.Wait() } - obj, err := p.activeQ.Pop() + pInfo, err := p.activeQ.Pop() if err != nil { return nil, err } - pInfo := obj.(*framework.QueuedPodInfo) pInfo.Attempts++ p.schedulingCycle++ // In flight, no concurrent events yet. @@ -1039,22 +1021,23 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool { return !reflect.DeepEqual(strip(oldPod), strip(newPod)) } -func (p *PriorityQueue) updateInActiveQueue(logger klog.Logger, oldPod, newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) (bool, error) { +func (p *PriorityQueue) updateInActiveQueue(logger klog.Logger, oldPod, newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) bool { p.activeQLock.Lock() defer p.activeQLock.Unlock() - if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists { - pInfo := updatePod(oldPodInfo, newPod) + if pInfo, exists := p.activeQ.Get(oldPodInfo); exists { + _ = pInfo.Update(newPod) p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) - return true, p.activeQ.Update(pInfo) + p.activeQ.AddOrUpdate(pInfo) + return true } - return false, nil + return false } // Update updates a pod in the active or backoff queue if present. Otherwise, it removes // the item from the unschedulable queue if pod is updated in a way that it may // become schedulable and adds the updated one to the active queue. // If pod is not present in any of the queues, it is added to the active queue. -func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error { +func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) { p.lock.Lock() defer p.lock.Unlock() @@ -1075,7 +1058,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error }) p.activeQLock.Unlock() - return nil + return } p.activeQLock.Unlock() } @@ -1083,79 +1066,72 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error if oldPod != nil { oldPodInfo := newQueuedPodInfoForLookup(oldPod) // If the pod is already in the active queue, just update it there. - if exists, err := p.updateInActiveQueue(logger, oldPod, newPod, oldPodInfo); exists { - return err + if exists := p.updateInActiveQueue(logger, oldPod, newPod, oldPodInfo); exists { + return } // If the pod is in the backoff queue, update it there. - if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists { - pInfo := updatePod(oldPodInfo, newPod) + if pInfo, exists := p.podBackoffQ.Get(oldPodInfo); exists { + _ = pInfo.Update(newPod) p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) - return p.podBackoffQ.Update(pInfo) + p.podBackoffQ.AddOrUpdate(pInfo) + return } } // If the pod is in the unschedulable queue, updating it may make it schedulable. - if usPodInfo := p.unschedulablePods.get(newPod); usPodInfo != nil { - pInfo := updatePod(usPodInfo, newPod) + if pInfo := p.unschedulablePods.get(newPod); pInfo != nil { + _ = pInfo.Update(newPod) p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) - gated := usPodInfo.Gated + gated := pInfo.Gated if p.isSchedulingQueueHintEnabled { // When unscheduled Pods are updated, we check with QueueingHint // whether the update may make the pods schedulable. // Plugins have to implement a QueueingHint for Pod/Update event // if the rejection from them could be resolved by updating unscheduled Pods itself. - events := framework.PodSchedulingPropertiesChange(newPod, oldPod) for _, evt := range events { hint := p.isPodWorthRequeuing(logger, pInfo, evt, oldPod, newPod) queue := p.requeuePodViaQueueingHint(logger, pInfo, hint, framework.UnscheduledPodUpdate.Label) if queue != unschedulablePods { logger.V(5).Info("Pod moved to an internal scheduling queue because the Pod is updated", "pod", klog.KObj(newPod), "event", framework.PodUpdate, "queue", queue) - p.unschedulablePods.delete(usPodInfo.Pod, gated) + p.unschedulablePods.delete(pInfo.Pod, gated) } if queue == activeQ { p.cond.Broadcast() break } } - - return nil + return } if isPodUpdated(oldPod, newPod) { - - if p.isPodBackingoff(usPodInfo) { - if err := p.podBackoffQ.Add(pInfo); err != nil { - return err - } - p.unschedulablePods.delete(usPodInfo.Pod, gated) + if p.isPodBackingoff(pInfo) { + p.podBackoffQ.AddOrUpdate(pInfo) + p.unschedulablePods.delete(pInfo.Pod, gated) logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", framework.PodUpdate, "queue", backoffQ) - return nil + return } - if added, err := p.moveToActiveQ(logger, pInfo, framework.BackoffComplete); !added { - return err + if added := p.moveToActiveQ(logger, pInfo, framework.BackoffComplete); added { + p.cond.Broadcast() } - p.cond.Broadcast() - return nil + return } // Pod update didn't make it schedulable, keep it in the unschedulable queue. p.unschedulablePods.addOrUpdate(pInfo) - return nil + return } // If pod is not in any of the queues, we put it in the active queue. pInfo := p.newQueuedPodInfo(newPod) - if added, err := p.moveToActiveQ(logger, pInfo, framework.PodUpdate); !added { - return err + if added := p.moveToActiveQ(logger, pInfo, framework.PodUpdate); added { + p.cond.Broadcast() } - p.cond.Broadcast() - return nil } // Delete deletes the item from either of the two queues. It assumes the pod is // only in one queue. -func (p *PriorityQueue) Delete(pod *v1.Pod) error { +func (p *PriorityQueue) Delete(pod *v1.Pod) { p.lock.Lock() defer p.lock.Unlock() p.DeleteNominatedPodIfExists(pod) @@ -1169,7 +1145,6 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error { p.unschedulablePods.delete(pod, pInfo.Gated) } } - return nil } // AssignedPodAdded is called when a bound pod is added. Creation of this pod @@ -1241,25 +1216,14 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra return unschedulablePods } - pod := pInfo.Pod if strategy == queueAfterBackoff && p.isPodBackingoff(pInfo) { - if err := p.podBackoffQ.Add(pInfo); err != nil { - logger.Error(err, "Error adding pod to the backoff queue, queue this Pod to unschedulable pod pool", "pod", klog.KObj(pod)) - p.unschedulablePods.addOrUpdate(pInfo) - return unschedulablePods - } - + p.podBackoffQ.AddOrUpdate(pInfo) metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc() return backoffQ } // Reach here if schedulingHint is QueueImmediately, or schedulingHint is Queue but the pod is not backing off. - - added, err := p.moveToActiveQ(logger, pInfo, event) - if err != nil { - logger.Error(err, "Error adding pod to the active queue, queue this Pod to unschedulable pod pool", "pod", klog.KObj(pod)) - } - if added { + if added := p.moveToActiveQ(logger, pInfo, event); added { return activeQ } if pInfo.Gated { @@ -1368,7 +1332,7 @@ func (p *PriorityQueue) PodsInActiveQ() []*v1.Pod { defer p.activeQLock.RUnlock() var result []*v1.Pod for _, pInfo := range p.activeQ.List() { - result = append(result, pInfo.(*framework.QueuedPodInfo).Pod) + result = append(result, pInfo.Pod) } return result } @@ -1384,7 +1348,7 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) { result := p.PodsInActiveQ() activeQLen := len(result) for _, pInfo := range p.podBackoffQ.List() { - result = append(result, pInfo.(*framework.QueuedPodInfo).Pod) + result = append(result, pInfo.Pod) } for _, pInfo := range p.unschedulablePods.podInfoMap { result = append(result, pInfo.Pod) @@ -1397,20 +1361,18 @@ func (p *PriorityQueue) nominatedPodToInfo(np PodRef) *framework.PodInfo { pod := np.ToPod() pInfoLookup := newQueuedPodInfoForLookup(pod) - obj, exists, _ := p.activeQ.Get(pInfoLookup) + queuedPodInfo, exists := p.activeQ.Get(pInfoLookup) if exists { - queuedPodInfo := obj.(*framework.QueuedPodInfo) return queuedPodInfo.PodInfo } - queuedPodInfo := p.unschedulablePods.get(pod) + queuedPodInfo = p.unschedulablePods.get(pod) if queuedPodInfo != nil { return queuedPodInfo.PodInfo } - obj, exists, _ = p.podBackoffQ.Get(pInfoLookup) + queuedPodInfo, exists = p.podBackoffQ.Get(pInfoLookup) if exists { - queuedPodInfo := obj.(*framework.QueuedPodInfo) return queuedPodInfo.PodInfo } @@ -1469,9 +1431,7 @@ func (npm *nominator) NominatedPodsForNode(nodeName string) []*framework.PodInfo return npm.nominatedPodsToInfo(nominatedPods) } -func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool { - pInfo1 := podInfo1.(*framework.QueuedPodInfo) - pInfo2 := podInfo2.(*framework.QueuedPodInfo) +func (p *PriorityQueue) podsCompareBackoffCompleted(pInfo1, pInfo2 *framework.QueuedPodInfo) bool { bo1 := p.getBackoffTime(pInfo1) bo2 := p.getBackoffTime(pInfo2) return bo1.Before(bo2) @@ -1512,12 +1472,6 @@ func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInf return duration } -func updatePod(oldPodInfo interface{}, newPod *v1.Pod) *framework.QueuedPodInfo { - pInfo := oldPodInfo.(*framework.QueuedPodInfo) - pInfo.Update(newPod) - return pInfo -} - // UnschedulablePods holds pods that cannot be scheduled. This data structure // is used to implement unschedulablePods. type UnschedulablePods struct { @@ -1729,6 +1683,6 @@ func newPodNominator(podLister listersv1.PodLister, nominatedPodsToInfo func([]P } } -func podInfoKeyFunc(obj interface{}) (string, error) { - return cache.MetaNamespaceKeyFunc(obj.(*framework.QueuedPodInfo).Pod) +func podInfoKeyFunc(pInfo *framework.QueuedPodInfo) string { + return cache.NewObjectName(pInfo.Pod.Namespace, pInfo.Pod.Name).String() } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 07c321bcb38..0033482371e 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -122,15 +122,9 @@ func TestPriorityQueue_Add(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs) - if err := q.Add(logger, medPriorityPodInfo.Pod); err != nil { - t.Errorf("add failed: %v", err) - } - if err := q.Add(logger, unschedulablePodInfo.Pod); err != nil { - t.Errorf("add failed: %v", err) - } - if err := q.Add(logger, highPriorityPodInfo.Pod); err != nil { - t.Errorf("add failed: %v", err) - } + q.Add(logger, medPriorityPodInfo.Pod) + q.Add(logger, unschedulablePodInfo.Pod) + q.Add(logger, highPriorityPodInfo.Pod) expectedNominatedPods := &nominator{ nominatedPodToNode: map[types.UID]string{ medPriorityPodInfo.Pod.UID: "node1", @@ -168,12 +162,8 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs) - if err := q.Add(logger, medPriorityPodInfo.Pod); err != nil { - t.Errorf("add failed: %v", err) - } - if err := q.Add(logger, highPriorityPodInfo.Pod); err != nil { - t.Errorf("add failed: %v", err) - } + q.Add(logger, medPriorityPodInfo.Pod) + q.Add(logger, highPriorityPodInfo.Pod) if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } @@ -748,17 +738,17 @@ func Test_InFlightPods(t *testing.T) { } if test.wantActiveQPodNames != nil { - rawPodInfos := q.activeQ.List() - if len(rawPodInfos) != len(test.wantActiveQPodNames) { - diff := cmp.Diff(test.wantActiveQPodNames, rawPodInfos, cmpopts.SortSlices(func(a, b interface{}) bool { + podInfos := q.activeQ.List() + if len(podInfos) != len(test.wantActiveQPodNames) { + diff := cmp.Diff(test.wantActiveQPodNames, podInfos, cmpopts.SortSlices(func(a, b interface{}) bool { return a.(framework.PodInfo).Pod.Name < b.(framework.PodInfo).Pod.Name })) - t.Fatalf("Length of activeQ is not expected. Got %v, want %v.\n%s", len(rawPodInfos), len(test.wantActiveQPodNames), diff) + t.Fatalf("Length of activeQ is not expected. Got %v, want %v.\n%s", len(podInfos), len(test.wantActiveQPodNames), diff) } wantPodNames := sets.New(test.wantActiveQPodNames...) - for _, rawPodInfo := range rawPodInfos { - podGotFromActiveQ := rawPodInfo.(*framework.QueuedPodInfo).Pod + for _, podInfo := range podInfos { + podGotFromActiveQ := podInfo.Pod if !wantPodNames.Has(podGotFromActiveQ.Name) { t.Fatalf("Pod %v was not expected to be in the activeQ.", podGotFromActiveQ.Name) } @@ -766,17 +756,17 @@ func Test_InFlightPods(t *testing.T) { } if test.wantBackoffQPodNames != nil { - rawPodInfos := q.podBackoffQ.List() - if len(rawPodInfos) != len(test.wantBackoffQPodNames) { - diff := cmp.Diff(test.wantBackoffQPodNames, rawPodInfos, cmpopts.SortSlices(func(a, b interface{}) bool { + podInfos := q.podBackoffQ.List() + if len(podInfos) != len(test.wantBackoffQPodNames) { + diff := cmp.Diff(test.wantBackoffQPodNames, podInfos, cmpopts.SortSlices(func(a, b interface{}) bool { return a.(framework.PodInfo).Pod.Name < b.(framework.PodInfo).Pod.Name })) - t.Fatalf("Length of backoffQ is not expected. Got %v, want %v.\n%s", len(rawPodInfos), len(test.wantBackoffQPodNames), diff) + t.Fatalf("Length of backoffQ is not expected. Got %v, want %v.\n%s", len(podInfos), len(test.wantBackoffQPodNames), diff) } wantPodNames := sets.New(test.wantBackoffQPodNames...) - for _, rawPodInfo := range rawPodInfos { - podGotFromBackoffQ := rawPodInfo.(*framework.QueuedPodInfo).Pod + for _, podInfo := range podInfos { + podGotFromBackoffQ := podInfo.Pod if !wantPodNames.Has(podGotFromBackoffQ.Name) { t.Fatalf("Pod %v was not expected to be in the backoffQ.", podGotFromBackoffQ.Name) } @@ -935,7 +925,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { // Since there was a move request at the same cycle as "oldCycle", these pods // should be in the backoff queue. for i := 1; i < totalNum; i++ { - if _, exists, _ := q.podBackoffQ.Get(newQueuedPodInfoForLookup(&expectedPods[i])); !exists { + if !q.podBackoffQ.Has(newQueuedPodInfoForLookup(&expectedPods[i])) { t.Errorf("Expected %v to be added to podBackoffQ.", expectedPods[i].Name) } } @@ -1026,10 +1016,7 @@ func TestPriorityQueue_Update(t *testing.T) { name: "When updating a pod that is already in activeQ, the pod should remain in activeQ after Update()", wantQ: activeQ, prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { - err := q.Update(logger, nil, highPriorityPodInfo.Pod) - if err != nil { - t.Errorf("add pod %s error: %v", highPriorityPodInfo.Pod.Name, err) - } + q.Update(logger, nil, highPriorityPodInfo.Pod) return highPriorityPodInfo.Pod, highPriorityPodInfo.Pod }, schedulingHintsEnablement: []bool{false, true}, @@ -1039,9 +1026,7 @@ func TestPriorityQueue_Update(t *testing.T) { wantQ: backoffQ, prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod) - if err := q.podBackoffQ.Add(podInfo); err != nil { - t.Errorf("adding pod to backoff queue error: %v", err) - } + q.podBackoffQ.AddOrUpdate(podInfo) return podInfo.Pod, podInfo.Pod }, schedulingHintsEnablement: []bool{false, true}, @@ -1088,10 +1073,7 @@ func TestPriorityQueue_Update(t *testing.T) { prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod) // We need to once add this Pod to activeQ and Pop() it so that this Pod is registered correctly in inFlightPods. - err := q.activeQ.Add(podInfo) - if err != nil { - t.Errorf("unexpected error from activeQ.Add: %v", err) - } + q.activeQ.AddOrUpdate(podInfo) if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) } @@ -1115,25 +1097,23 @@ func TestPriorityQueue_Update(t *testing.T) { oldPod, newPod := tt.prepareFunc(t, logger, q) - if err := q.Update(logger, oldPod, newPod); err != nil { - t.Fatalf("unexpected error from Update: %v", err) - } + q.Update(logger, oldPod, newPod) var pInfo *framework.QueuedPodInfo // validate expected queue - if obj, exists, _ := q.podBackoffQ.Get(newQueuedPodInfoForLookup(newPod)); exists { + if pInfoFromBackoff, exists := q.podBackoffQ.Get(newQueuedPodInfoForLookup(newPod)); exists { if tt.wantQ != backoffQ { t.Errorf("expected pod %s not to be queued to backoffQ, but it was", newPod.Name) } - pInfo = obj.(*framework.QueuedPodInfo) + pInfo = pInfoFromBackoff } - if obj, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(newPod)); exists { + if pInfoFromActive, exists := q.activeQ.Get(newQueuedPodInfoForLookup(newPod)); exists { if tt.wantQ != activeQ { t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name) } - pInfo = obj.(*framework.QueuedPodInfo) + pInfo = pInfoFromActive } if pInfoFromUnsched := q.unschedulablePods.get(newPod); pInfoFromUnsched != nil { @@ -1183,9 +1163,7 @@ func TestPriorityQueue_UpdateWhenInflight(t *testing.T) { // test-pod is created and popped out from the queue testPod := st.MakePod().Name("test-pod").Namespace("test-ns").UID("test-uid").Obj() - if err := q.Add(logger, testPod); err != nil { - t.Errorf("add failed: %v", err) - } + q.Add(logger, testPod) if p, err := q.Pop(logger); err != nil || p.Pod != testPod { t.Errorf("Expected: %v after Pop, but got: %v", testPod.Name, p.Pod.Name) } @@ -1199,9 +1177,7 @@ func TestPriorityQueue_UpdateWhenInflight(t *testing.T) { }, } - if err := q.Update(logger, testPod, updatedPod); err != nil { - t.Errorf("Error calling Update: %v", err) - } + q.Update(logger, testPod, updatedPod) // test-pod got rejected by fakePlugin, // but the update event that it just got may change this scheduling result, // and hence we should put this pod to activeQ/backoffQ. @@ -1210,11 +1186,9 @@ func TestPriorityQueue_UpdateWhenInflight(t *testing.T) { t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) } - var pInfo *framework.QueuedPodInfo - if obj, exists, _ := q.podBackoffQ.Get(newQueuedPodInfoForLookup(updatedPod)); !exists { + pInfo, exists := q.podBackoffQ.Get(newQueuedPodInfoForLookup(updatedPod)) + if !exists { t.Fatalf("expected pod %s to be queued to backoffQ, but it wasn't.", updatedPod.Name) - } else { - pInfo = obj.(*framework.QueuedPodInfo) } if diff := cmp.Diff(updatedPod, pInfo.PodInfo.Pod); diff != "" { t.Errorf("Unexpected updated pod diff (-want, +got): %s", diff) @@ -1229,21 +1203,17 @@ func TestPriorityQueue_Delete(t *testing.T) { q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs) q.Update(logger, highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod) q.Add(logger, unschedulablePodInfo.Pod) - if err := q.Delete(highPriNominatedPodInfo.Pod); err != nil { - t.Errorf("delete failed: %v", err) - } - if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)); !exists { + q.Delete(highPriNominatedPodInfo.Pod) + if !q.activeQ.Has(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)) { t.Errorf("Expected %v to be in activeQ.", unschedulablePodInfo.Pod.Name) } - if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)); exists { + if q.activeQ.Has(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)) { t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPodInfo.Pod.Name) } if len(q.nominator.nominatedPods) != 1 { t.Errorf("Expected nominatedPods to have only 'unschedulablePodInfo': %v", q.nominator.nominatedPods) } - if err := q.Delete(unschedulablePodInfo.Pod); err != nil { - t.Errorf("delete failed: %v", err) - } + q.Delete(unschedulablePodInfo.Pod) if len(q.nominator.nominatedPods) != 0 { t.Errorf("Expected nominatedPods to be empty: %v", q.nominator) } @@ -1293,7 +1263,7 @@ func TestPriorityQueue_Activate(t *testing.T) { // Prepare activeQ/unschedulablePods/podBackoffQ according to the table for _, qPodInfo := range tt.qPodInfoInActiveQ { - q.activeQ.Add(qPodInfo) + q.activeQ.AddOrUpdate(qPodInfo) } for _, qPodInfo := range tt.qPodInfoInUnschedulablePods { @@ -1301,7 +1271,7 @@ func TestPriorityQueue_Activate(t *testing.T) { } for _, qPodInfo := range tt.qPodInfoInPodBackoffQ { - q.podBackoffQ.Add(qPodInfo) + q.podBackoffQ.AddOrUpdate(qPodInfo) } // Activate specific pod according to the table @@ -1314,7 +1284,7 @@ func TestPriorityQueue_Activate(t *testing.T) { // Check if the specific pod exists in activeQ for _, want := range tt.want { - if _, exist, _ := q.activeQ.Get(newQueuedPodInfoForLookup(want.PodInfo.Pod)); !exist { + if !q.activeQ.Has(newQueuedPodInfoForLookup(want.PodInfo.Pod)) { t.Errorf("podInfo not exist in activeQ: want %v", want.PodInfo.Pod.Name) } } @@ -1393,7 +1363,7 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) { m := map[string][]framework.PreEnqueuePlugin{"": tt.plugins} q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m), WithPodInitialBackoffDuration(time.Second*30), WithPodMaxBackoffDuration(time.Second*60)) - got, _ := q.moveToActiveQ(logger, q.newQueuedPodInfo(tt.pod), framework.PodAdd) + got := q.moveToActiveQ(logger, q.newQueuedPodInfo(tt.pod), framework.PodAdd) if got != tt.wantSuccess { t.Errorf("Unexpected result: want %v, but got %v", tt.wantSuccess, got) } @@ -1600,7 +1570,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing. } cl := testingclock.NewFakeClock(now) q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(cl)) - q.activeQ.Add(q.newQueuedPodInfo(test.podInfo.Pod)) + q.activeQ.AddOrUpdate(q.newQueuedPodInfo(test.podInfo.Pod)) if p, err := q.Pop(logger); err != nil || p.Pod != test.podInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", test.podInfo.Pod.Name, p.Pod.Name) } @@ -1644,12 +1614,12 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { } q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + q.activeQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID) - q.activeQ.Add(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + q.activeQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } @@ -1665,7 +1635,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { expectInFlightPods(t, q) // Construct a Pod, but don't associate its scheduler failure to any plugin hpp1 := clonePod(highPriorityPodInfo.Pod, "hpp1") - q.activeQ.Add(q.newQueuedPodInfo(hpp1)) + q.activeQ.AddOrUpdate(q.newQueuedPodInfo(hpp1)) if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 { t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name) } @@ -1678,7 +1648,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { expectInFlightPods(t, q) // Construct another Pod, and associate its scheduler failure to plugin "barPlugin". hpp2 := clonePod(highPriorityPodInfo.Pod, "hpp2") - q.activeQ.Add(q.newQueuedPodInfo(hpp2)) + q.activeQ.AddOrUpdate(q.newQueuedPodInfo(hpp2)) if p, err := q.Pop(logger); err != nil || p.Pod != hpp2 { t.Errorf("Expected: %v after Pop, but got: %v", hpp2, p.Pod.Name) } @@ -1714,17 +1684,17 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID) q.schedulingCycle++ - q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + q.activeQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID) - q.activeQ.Add(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + q.activeQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID) - q.activeQ.Add(q.newQueuedPodInfo(hpp1)) + q.activeQ.AddOrUpdate(q.newQueuedPodInfo(hpp1)) if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 { t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name) } @@ -1755,7 +1725,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { t.Errorf("Expected %v in the unschedulablePods", pod.Name) } } - if _, ok, _ := q.podBackoffQ.Get(hpp1QueuedPodInfo); !ok { + if !q.podBackoffQ.Has(hpp1QueuedPodInfo) { t.Errorf("Expected %v in the podBackoffQ", hpp1.Name) } @@ -1792,9 +1762,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi } q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - if err := q.Add(logger, medPriorityPodInfo.Pod); err != nil { - t.Errorf("add failed: %v", err) - } + q.Add(logger, medPriorityPodInfo.Pod) err := q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) if err != nil { @@ -1858,9 +1826,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi if err != nil { t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) } - if err = q.Add(logger, medPriorityPodInfo.Pod); err != nil { - t.Errorf("add failed: %v", err) - } + q.Add(logger, medPriorityPodInfo.Pod) // hpp1 will go to backoffQ because no failure plugin is associated with it. // All plugins other than hpp1 are enqueued to the unschedulable Pod pool. for _, pod := range []*v1.Pod{unschedulablePodInfo.Pod, highPriorityPodInfo.Pod, hpp2} { @@ -1868,9 +1834,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi t.Errorf("Expected %v in the unschedulablePods", pod.Name) } } - if _, ok, _ := q.podBackoffQ.Get(hpp1QueuedPodInfo); !ok { - t.Errorf("Expected %v in the podBackoffQ", hpp1.Name) - } + q.podBackoffQ.Get(hpp1QueuedPodInfo) // Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off, // and the pods will be moved into activeQ. @@ -1990,9 +1954,7 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) { q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - if err := q.activeQ.Add(q.newQueuedPodInfo(tt.unschedPod)); err != nil { - t.Errorf("failed to add pod to activeQ: %v", err) - } + q.activeQ.AddOrUpdate(q.newQueuedPodInfo(tt.unschedPod)) if p, err := q.Pop(logger); err != nil || p.Pod != tt.unschedPod { t.Errorf("Expected: %v after Pop, but got: %v", tt.unschedPod.Name, p.Pod.Name) } @@ -2007,8 +1969,8 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) { q.AssignedPodAdded(logger, tt.updatedAssignedPod) - if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(tt.unschedPod)); exists != tt.wantToRequeue { - t.Fatalf("unexpected Pod move: Pod should be requeued: %v. Pod is actually requeued: %v", tt.wantToRequeue, exists) + if q.activeQ.Has(newQueuedPodInfoForLookup(tt.unschedPod)) != tt.wantToRequeue { + t.Fatalf("unexpected Pod move: Pod should be requeued: %v. Pod is actually requeued: %v", tt.wantToRequeue, !tt.wantToRequeue) } }) } @@ -2107,11 +2069,11 @@ func TestPriorityQueue_PendingPods(t *testing.T) { defer cancel() q := NewTestQueue(ctx, newDefaultQueueSort()) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + q.activeQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } - q.activeQ.Add(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + q.activeQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } @@ -2150,9 +2112,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { defer cancel() objs := []runtime.Object{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod, scheduledPodInfo.Pod} q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs) - if err := q.Add(logger, medPriorityPodInfo.Pod); err != nil { - t.Errorf("add failed: %v", err) - } + q.Add(logger, medPriorityPodInfo.Pod) // Update unschedulablePodInfo on a different node than specified in the pod. q.AddNominatedPod(logger, mustNewTestPodInfo(t, unschedulablePodInfo.Pod), &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "node5"}) @@ -2459,7 +2419,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { }) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. - q.activeQ.Add(q.newQueuedPodInfo(unschedulablePod)) + q.activeQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePod)) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Pod.Name) } @@ -2597,11 +2557,11 @@ func TestHighPriorityFlushUnschedulablePodsLeftover(t *testing.T) { }) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.Add(q.newQueuedPodInfo(highPod)) + q.activeQ.AddOrUpdate(q.newQueuedPodInfo(highPod)) if p, err := q.Pop(logger); err != nil || p.Pod != highPod { t.Errorf("Expected: %v after Pop, but got: %v", highPod.Name, p.Pod.Name) } - q.activeQ.Add(q.newQueuedPodInfo(midPod)) + q.activeQ.AddOrUpdate(q.newQueuedPodInfo(midPod)) if p, err := q.Pop(logger); err != nil || p.Pod != midPod { t.Errorf("Expected: %v after Pop, but got: %v", midPod.Name, p.Pod.Name) } @@ -2698,7 +2658,7 @@ func TestPriorityQueue_initPodMaxInUnschedulablePodsDuration(t *testing.T) { if pInfo, err := queue.activeQ.Pop(); err != nil { t.Errorf("Error while popping the head of the queue: %v", err) } else { - podInfoList = append(podInfoList, pInfo.(*framework.QueuedPodInfo)) + podInfoList = append(podInfoList, pInfo) } } @@ -2713,17 +2673,13 @@ type operation func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInf var ( add = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { - if err := queue.Add(logger, pInfo.Pod); err != nil { - t.Fatalf("Unexpected error during Add: %v", err) - } + queue.Add(logger, pInfo.Pod) } popAndRequeueAsUnschedulable = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. // UnschedulablePlugins will get cleared by Pop, so make a copy first. unschedulablePlugins := pInfo.UnschedulablePlugins.Clone() - if err := queue.activeQ.Add(queue.newQueuedPodInfo(pInfo.Pod)); err != nil { - t.Fatalf("Unexpected error during Add: %v", err) - } + queue.activeQ.AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) p, err := queue.Pop(logger) if err != nil { t.Fatalf("Unexpected error during Pop: %v", err) @@ -2739,9 +2695,7 @@ var ( } popAndRequeueAsBackoff = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. - if err := queue.activeQ.Add(queue.newQueuedPodInfo(pInfo.Pod)); err != nil { - t.Fatalf("Unexpected error during Add: %v", err) - } + queue.activeQ.AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) p, err := queue.Pop(logger) if err != nil { t.Fatalf("Unexpected error during Pop: %v", err) @@ -2755,10 +2709,7 @@ var ( } } addPodActiveQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { - queue.activeQ.Add(pInfo) - } - updatePodActiveQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { - queue.activeQ.Update(pInfo) + queue.activeQ.AddOrUpdate(pInfo) } addPodUnschedulablePods = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { if !pInfo.Gated { @@ -2781,7 +2732,7 @@ var ( queue.Update(logger, pInfo.Pod, newPod) } addPodBackoffQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { - queue.podBackoffQ.Add(pInfo) + queue.podBackoffQ.AddOrUpdate(pInfo) } moveAllToActiveOrBackoffQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) { queue.MoveAllToActiveOrBackoffQueue(logger, framework.UnschedulableTimeout, nil, nil, nil) @@ -2829,15 +2780,6 @@ func TestPodTimestamp(t *testing.T) { operands: []*framework.QueuedPodInfo{pInfo2, pInfo1}, expected: []*framework.QueuedPodInfo{pInfo1, pInfo2}, }, - { - name: "update two pod to activeQ and sort them by the timestamp", - operations: []operation{ - updatePodActiveQ, - updatePodActiveQ, - }, - operands: []*framework.QueuedPodInfo{pInfo2, pInfo1}, - expected: []*framework.QueuedPodInfo{pInfo1, pInfo2}, - }, { name: "add two pod to unschedulablePods then move them to activeQ and sort them by the timestamp", operations: []operation{ @@ -2883,7 +2825,7 @@ func TestPodTimestamp(t *testing.T) { if pInfo, err := queue.activeQ.Pop(); err != nil { t.Errorf("Error while popping the head of the queue: %v", err) } else { - podInfoList = append(podInfoList, pInfo.(*framework.QueuedPodInfo)) + podInfoList = append(podInfoList, pInfo) } } @@ -3398,9 +3340,7 @@ func TestBackOffFlow(t *testing.T) { Namespace: pod.Namespace, Name: pod.Name, } - if err := q.Add(logger, pod); err != nil { - t.Fatal(err) - } + q.Add(logger, pod) for i, step := range steps { t.Run(fmt.Sprintf("step %d", i), func(t *testing.T) { @@ -3421,7 +3361,7 @@ func TestBackOffFlow(t *testing.T) { // An event happens. q.MoveAllToActiveOrBackoffQueue(logger, framework.UnschedulableTimeout, nil, nil, nil) - if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok { + if !q.podBackoffQ.Has(podInfo) { t.Errorf("pod %v is not in the backoff queue", podID) } @@ -3436,13 +3376,13 @@ func TestBackOffFlow(t *testing.T) { cl.Step(time.Millisecond) q.flushBackoffQCompleted(logger) // Still in backoff queue after an early flush. - if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok { + if !q.podBackoffQ.Has(podInfo) { t.Errorf("pod %v is not in the backoff queue", podID) } // Moved out of the backoff queue after timeout. cl.Step(backoff) q.flushBackoffQCompleted(logger) - if _, ok, _ := q.podBackoffQ.Get(podInfo); ok { + if q.podBackoffQ.Has(podInfo) { t.Errorf("pod %v is still in the backoff queue", podID) } }) @@ -3513,7 +3453,7 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { q := NewTestQueue(ctx, newDefaultQueueSort()) for i, podInfo := range tt.podInfos { // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. - q.activeQ.Add(q.newQueuedPodInfo(podInfo.Pod)) + q.activeQ.AddOrUpdate(q.newQueuedPodInfo(podInfo.Pod)) if p, err := q.Pop(logger); err != nil || p.Pod != podInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", podInfo.Pod.Name, p.Pod.Name) } @@ -3532,14 +3472,10 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { q.MoveAllToActiveOrBackoffQueue(logger, tt.event, nil, nil, tt.preEnqueueCheck) var got []string for q.podBackoffQ.Len() != 0 { - obj, err := q.podBackoffQ.Pop() + queuedPodInfo, err := q.podBackoffQ.Pop() if err != nil { t.Fatalf("Fail to pop pod from backoffQ: %v", err) } - queuedPodInfo, ok := obj.(*framework.QueuedPodInfo) - if !ok { - t.Fatalf("Fail to convert popped obj (type %T) to *framework.QueuedPodInfo", obj) - } got = append(got, queuedPodInfo.Pod.Name) } if diff := cmp.Diff(tt.want, got); diff != "" { @@ -3912,9 +3848,7 @@ func Test_queuedPodInfo_gatedSetUponCreationAndUnsetUponUpdate(t *testing.T) { q := NewTestQueue(ctx, newDefaultQueueSort(), WithPreEnqueuePluginMap(m)) gatedPod := st.MakePod().SchedulingGates([]string{"hello world"}).Obj() - if err := q.Add(logger, gatedPod); err != nil { - t.Errorf("Error while adding gated pod: %v", err) - } + q.Add(logger, gatedPod) if !q.unschedulablePods.get(gatedPod).Gated { t.Error("Expected pod to be gated") @@ -3922,9 +3856,7 @@ func Test_queuedPodInfo_gatedSetUponCreationAndUnsetUponUpdate(t *testing.T) { ungatedPod := gatedPod.DeepCopy() ungatedPod.Spec.SchedulingGates = nil - if err := q.Update(logger, gatedPod, ungatedPod); err != nil { - t.Errorf("Error while updating pod to ungated: %v", err) - } + q.Update(logger, gatedPod, ungatedPod) ungatedPodInfo, _ := q.Pop(logger) if ungatedPodInfo.Gated { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index d6351d64509..e3986bf4193 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -287,9 +287,7 @@ func TestFailureHandler(t *testing.T) { queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now()))) schedulerCache := internalcache.New(ctx, 30*time.Second) - if err := queue.Add(logger, testPod); err != nil { - t.Fatalf("Add failed: %v", err) - } + queue.Add(logger, testPod) if _, err := queue.Pop(logger); err != nil { t.Fatalf("Pop failed: %v", err) diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go index a483f2ab0ad..bb9316419ac 100644 --- a/test/integration/scheduler/queue_test.go +++ b/test/integration/scheduler/queue_test.go @@ -314,13 +314,7 @@ func TestCoreResourceEnqueue(t *testing.T) { newPod = oldPod.DeepCopy() newPod.Status.Conditions[0].Message = "injected message" - if err := testCtx.Scheduler.SchedulingQueue.Update( - klog.FromContext(testCtx.Ctx), - oldPod, - newPod, - ); err != nil { - return fmt.Errorf("failed to update the pod: %w", err) - } + testCtx.Scheduler.SchedulingQueue.Update(klog.FromContext(testCtx.Ctx), oldPod, newPod) return nil }, wantRequeuedPods: sets.Set[string]{},