From 5def93b10a782c40035e7cf3bf5156616b1ce62e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Skocze=C5=84?= Date: Mon, 1 Jul 2024 08:59:26 +0000 Subject: [PATCH] Add a separate lock for pod nominator in scheduling queue --- pkg/scheduler/extender_test.go | 2 +- .../default_preemption_test.go | 8 +- .../framework/preemption/preemption_test.go | 4 +- .../framework/runtime/framework_test.go | 2 +- .../internal/queue/scheduling_queue.go | 166 ++++++++++++------ .../internal/queue/scheduling_queue_test.go | 32 ++-- pkg/scheduler/internal/queue/testing.go | 15 ++ pkg/scheduler/schedule_one_test.go | 18 +- 8 files changed, 162 insertions(+), 85 deletions(-) diff --git a/pkg/scheduler/extender_test.go b/pkg/scheduler/extender_test.go index 7382fab92ff..3d6898a4bc9 100644 --- a/pkg/scheduler/extender_test.go +++ b/pkg/scheduler/extender_test.go @@ -337,7 +337,7 @@ func TestSchedulerWithExtenders(t *testing.T) { test.registerPlugins, "", runtime.WithClientSet(client), runtime.WithInformerFactory(informerFactory), - runtime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), + runtime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), runtime.WithLogger(logger), ) if err != nil { diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index 6842dbe6bf7..ce3590a33a7 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -365,7 +365,7 @@ func TestPostFilter(t *testing.T) { frameworkruntime.WithClientSet(cs), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithExtenders(extenders), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)), frameworkruntime.WithLogger(logger), @@ -1102,7 +1102,7 @@ func TestDryRunPreemption(t *testing.T) { fwk, err := tf.NewFramework( ctx, registeredPlugins, "", - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithParallelism(parallelism), @@ -1361,7 +1361,7 @@ func TestSelectBestCandidate(t *testing.T) { tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithLogger(logger), ) @@ -1746,7 +1746,7 @@ func TestPreempt(t *testing.T) { frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), frameworkruntime.WithExtenders(extenders), - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithWaitingPods(waitingPods), diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index 79e71b49057..d61ee6f82f4 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -341,7 +341,7 @@ func TestDryRunPreemption(t *testing.T) { fwk, err := tf.NewFramework( ctx, registeredPlugins, "", - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithParallelism(parallelism), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, tt.nodes)), @@ -446,7 +446,7 @@ func TestSelectCandidate(t *testing.T) { ctx, registeredPlugins, "", - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithLogger(logger), ) diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index 269907074f4..a20467defc2 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -2356,7 +2356,7 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) { ) } - podNominator := internalqueue.NewPodNominator(nil) + podNominator := internalqueue.NewTestPodNominator(nil) if tt.nominatedPod != nil { podNominator.AddNominatedPod( logger, diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 2c9043a8d03..d7731d8a11a 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -32,10 +32,12 @@ import ( "fmt" "math/rand" "reflect" + "slices" "sync" "time" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -156,6 +158,11 @@ type PriorityQueue struct { stop chan struct{} clock clock.Clock + // lock takes precedence and should be taken first, + // before any other locks in the queue (activeQLock or nominator.nLock). + // Correct locking order is: lock > activeQLock > nominator.nLock. + lock sync.RWMutex + // pod initial backoff duration. podInitialBackoffDuration time.Duration // pod maximum backoff duration. @@ -169,9 +176,10 @@ type PriorityQueue struct { // activeQLock synchronizes all operations related to activeQ. // It protects activeQ, inFlightPods, inFlightEvents, schedulingCycle and closed fields. - // Caution: DO NOT take nominator.lock after taking activeQLock, - // you should take nominator.lock first if you need two locks, - // otherwise the queue could end up deadlock. + // Caution: DO NOT take "lock" after taking "activeQLock". + // You should always take "lock" first, otherwise the queue could end up in deadlock. + // "activeQLock" should not be taken after taking "nLock". + // Correct locking order is: lock > activeQLock > nominator.nLock. activeQLock sync.RWMutex // inFlightPods holds the UID of all pods which have been popped out for which Done @@ -381,7 +389,6 @@ func NewPriorityQueue( } pq := &PriorityQueue{ - nominator: newPodNominator(options.podLister), clock: options.clock, stop: make(chan struct{}), podInitialBackoffDuration: options.podInitialBackoffDuration, @@ -401,6 +408,7 @@ func NewPriorityQueue( pq.cond.L = &pq.activeQLock pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) pq.nsLister = informerFactory.Core().V1().Namespaces().Lister() + pq.nominator = newPodNominator(options.podLister, pq.nominatedPodsToInfo) return pq } @@ -606,7 +614,7 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ) metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() if event == PodAdd || event == PodUpdate { - p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil) + p.AddNominatedPod(logger, pInfo.PodInfo, nil) } return true, nil @@ -807,7 +815,7 @@ func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger, metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc() } - p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil) + p.AddNominatedPod(logger, pInfo.PodInfo, nil) return nil } @@ -860,7 +868,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo * p.cond.Broadcast() } - p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil) + p.AddNominatedPod(logger, pInfo.PodInfo, nil) return nil } @@ -1022,7 +1030,7 @@ func (p *PriorityQueue) updateInActiveQueue(logger klog.Logger, oldPod, newPod * defer p.activeQLock.Unlock() if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists { pInfo := updatePod(oldPodInfo, newPod) - p.updateNominatedPodUnlocked(logger, oldPod, pInfo.PodInfo) + p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) return true, p.activeQ.Update(pInfo) } return false, nil @@ -1068,7 +1076,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error // If the pod is in the backoff queue, update it there. if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists { pInfo := updatePod(oldPodInfo, newPod) - p.updateNominatedPodUnlocked(logger, oldPod, pInfo.PodInfo) + p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) return p.podBackoffQ.Update(pInfo) } } @@ -1076,7 +1084,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error // If the pod is in the unschedulable queue, updating it may make it schedulable. if usPodInfo := p.unschedulablePods.get(newPod); usPodInfo != nil { pInfo := updatePod(usPodInfo, newPod) - p.updateNominatedPodUnlocked(logger, oldPod, pInfo.PodInfo) + p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) gated := usPodInfo.Gated if p.isSchedulingQueueHintEnabled { // When unscheduled Pods are updated, we check with QueueingHint @@ -1130,7 +1138,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error func (p *PriorityQueue) Delete(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() - p.deleteNominatedPodIfExistsUnlocked(pod) + p.DeleteNominatedPodIfExists(pod) pInfo := newQueuedPodInfoForLookup(pod) p.activeQLock.Lock() defer p.activeQLock.Unlock() @@ -1378,6 +1386,43 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) { return result, fmt.Sprintf(pendingPodsSummary, activeQLen, p.podBackoffQ.Len(), len(p.unschedulablePods.podInfoMap)) } +// Note: this function assumes the caller locks p.lock.RLock. +func (p *PriorityQueue) nominatedPodToInfo(np PodRef) *framework.PodInfo { + pod := np.ToPod() + pInfoLookup := newQueuedPodInfoForLookup(pod) + + obj, exists, _ := p.activeQ.Get(pInfoLookup) + if exists { + queuedPodInfo := obj.(*framework.QueuedPodInfo) + return queuedPodInfo.PodInfo + } + + queuedPodInfo := p.unschedulablePods.get(pod) + if queuedPodInfo != nil { + return queuedPodInfo.PodInfo + } + + obj, exists, _ = p.podBackoffQ.Get(pInfoLookup) + if exists { + queuedPodInfo := obj.(*framework.QueuedPodInfo) + return queuedPodInfo.PodInfo + } + + return &framework.PodInfo{Pod: pod} +} + +func (p *PriorityQueue) nominatedPodsToInfo(nominatedPods []PodRef) []*framework.PodInfo { + p.lock.RLock() + defer p.lock.RUnlock() + p.activeQLock.RLock() + defer p.activeQLock.RUnlock() + pods := make([]*framework.PodInfo, len(nominatedPods)) + for i, np := range nominatedPods { + pods[i] = p.nominatedPodToInfo(np).DeepCopy() + } + return pods +} + // Close closes the priority queue. func (p *PriorityQueue) Close() { p.lock.Lock() @@ -1392,13 +1437,9 @@ func (p *PriorityQueue) Close() { // DeleteNominatedPodIfExists deletes from nominatedPods. func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) { - npm.lock.Lock() - npm.deleteNominatedPodIfExistsUnlocked(pod) - npm.lock.Unlock() -} - -func (npm *nominator) deleteNominatedPodIfExistsUnlocked(pod *v1.Pod) { + npm.nLock.Lock() npm.delete(pod) + npm.nLock.Unlock() } // AddNominatedPod adds a pod to the nominated pods of the given node. @@ -1406,22 +1447,20 @@ func (npm *nominator) deleteNominatedPodIfExistsUnlocked(pod *v1.Pod) { // the pod. We update the structure before sending a request to update the pod // object to avoid races with the following scheduling cycles. func (npm *nominator) AddNominatedPod(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) { - npm.lock.Lock() + npm.nLock.Lock() npm.addNominatedPodUnlocked(logger, pi, nominatingInfo) - npm.lock.Unlock() + npm.nLock.Unlock() } // NominatedPodsForNode returns a copy of pods that are nominated to run on the given node, // but they are waiting for other pods to be removed from the node. +// CAUTION: Make sure you don't call this function while taking any lock in any scenario. func (npm *nominator) NominatedPodsForNode(nodeName string) []*framework.PodInfo { - npm.lock.RLock() - defer npm.lock.RUnlock() - // Make a copy of the nominated Pods so the caller can mutate safely. - pods := make([]*framework.PodInfo, len(npm.nominatedPods[nodeName])) - for i := 0; i < len(pods); i++ { - pods[i] = npm.nominatedPods[nodeName][i].DeepCopy() - } - return pods + npm.nLock.RLock() + nominatedPods := slices.Clone(npm.nominatedPods[nodeName]) + npm.nLock.RUnlock() + // Note that nominatedPodsToInfo takes SchedulingQueue.lock inside. + return npm.nominatedPodsToInfo(nominatedPods) } func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool { @@ -1542,22 +1581,55 @@ func newUnschedulablePods(unschedulableRecorder, gatedRecorder metrics.MetricRec } } +type PodRef struct { + Name string + Namespace string + UID types.UID +} + +func PodToRef(pod *v1.Pod) PodRef { + return PodRef{ + Name: pod.Name, + Namespace: pod.Namespace, + UID: pod.UID, + } +} + +func (np PodRef) ToPod() *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: np.Name, + Namespace: np.Namespace, + UID: np.UID, + }, + } +} + // nominator is a structure that stores pods nominated to run on nodes. // It exists because nominatedNodeName of pod objects stored in the structure // may be different than what scheduler has here. We should be able to find pods // by their UID and update/delete them. type nominator struct { + // nLock synchronizes all operations related to nominator. + // Caution: DO NOT take ("SchedulingQueue.lock" or "SchedulingQueue.activeQLock") after taking "nLock". + // You should always take "SchedulingQueue.lock" and "SchedulingQueue.activeQLock" first, + // otherwise the nominator could end up in deadlock. + // Correct locking order is: SchedulingQueue.lock > SchedulingQueue.activeQLock > nLock. + nLock sync.RWMutex + // podLister is used to verify if the given pod is alive. podLister listersv1.PodLister // nominatedPods is a map keyed by a node name and the value is a list of // pods which are nominated to run on the node. These are pods which can be in // the activeQ or unschedulablePods. - nominatedPods map[string][]*framework.PodInfo + nominatedPods map[string][]PodRef // nominatedPodToNode is map keyed by a Pod UID to the node name where it is // nominated. nominatedPodToNode map[types.UID]string - - lock sync.RWMutex + // nominatedPodsToInfo returns PodInfos cached in the queues for nominated PodRefs. + // Note: it takes SchedulingQueue.lock inside. + // Make sure you don't call this function while taking any lock in any scenario. + nominatedPodsToInfo func([]PodRef) []*framework.PodInfo } func (npm *nominator) addNominatedPodUnlocked(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) { @@ -1589,13 +1661,13 @@ func (npm *nominator) addNominatedPodUnlocked(logger klog.Logger, pi *framework. } npm.nominatedPodToNode[pi.Pod.UID] = nodeName - for _, npi := range npm.nominatedPods[nodeName] { - if npi.Pod.UID == pi.Pod.UID { - logger.V(4).Info("Pod already exists in the nominator", "pod", klog.KObj(npi.Pod)) + for _, np := range npm.nominatedPods[nodeName] { + if np.UID == pi.Pod.UID { + logger.V(4).Info("Pod already exists in the nominator", "pod", np.UID) return } } - npm.nominatedPods[nodeName] = append(npm.nominatedPods[nodeName], pi) + npm.nominatedPods[nodeName] = append(npm.nominatedPods[nodeName], PodToRef(pi.Pod)) } func (npm *nominator) delete(p *v1.Pod) { @@ -1604,7 +1676,7 @@ func (npm *nominator) delete(p *v1.Pod) { return } for i, np := range npm.nominatedPods[nnn] { - if np.Pod.UID == p.UID { + if np.UID == p.UID { npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...) if len(npm.nominatedPods[nnn]) == 0 { delete(npm.nominatedPods, nnn) @@ -1617,12 +1689,8 @@ func (npm *nominator) delete(p *v1.Pod) { // UpdateNominatedPod updates the with . func (npm *nominator) UpdateNominatedPod(logger klog.Logger, oldPod *v1.Pod, newPodInfo *framework.PodInfo) { - npm.lock.Lock() - defer npm.lock.Unlock() - npm.updateNominatedPodUnlocked(logger, oldPod, newPodInfo) -} - -func (npm *nominator) updateNominatedPodUnlocked(logger klog.Logger, oldPod *v1.Pod, newPodInfo *framework.PodInfo) { + npm.nLock.Lock() + defer npm.nLock.Unlock() // In some cases, an Update event with no "NominatedNode" present is received right // after a node("NominatedNode") is reserved for this pod in memory. // In this case, we need to keep reserving the NominatedNode when updating the pod pointer. @@ -1646,18 +1714,12 @@ func (npm *nominator) updateNominatedPodUnlocked(logger klog.Logger, oldPod *v1. npm.addNominatedPodUnlocked(logger, newPodInfo, nominatingInfo) } -// NewPodNominator creates a nominator as a backing of framework.PodNominator. -// A podLister is passed in so as to check if the pod exists -// before adding its nominatedNode info. -func NewPodNominator(podLister listersv1.PodLister) framework.PodNominator { - return newPodNominator(podLister) -} - -func newPodNominator(podLister listersv1.PodLister) *nominator { +func newPodNominator(podLister listersv1.PodLister, nominatedPodsToInfo func([]PodRef) []*framework.PodInfo) *nominator { return &nominator{ - podLister: podLister, - nominatedPods: make(map[string][]*framework.PodInfo), - nominatedPodToNode: make(map[types.UID]string), + podLister: podLister, + nominatedPods: make(map[string][]PodRef), + nominatedPodToNode: make(map[types.UID]string), + nominatedPodsToInfo: nominatedPodsToInfo, } } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index edd57a61f39..fb7a5c29bdb 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -85,7 +85,7 @@ var ( nominatorCmpOpts = []cmp.Option{ cmp.AllowUnexported(nominator{}), - cmpopts.IgnoreFields(nominator{}, "podLister", "lock"), + cmpopts.IgnoreFields(nominator{}, "podLister", "nLock", "nominatedPodsToInfo"), } queueHintReturnQueue = func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { @@ -136,8 +136,8 @@ func TestPriorityQueue_Add(t *testing.T) { medPriorityPodInfo.Pod.UID: "node1", unschedulablePodInfo.Pod.UID: "node1", }, - nominatedPods: map[string][]*framework.PodInfo{ - "node1": {medPriorityPodInfo, unschedulablePodInfo}, + nominatedPods: map[string][]PodRef{ + "node1": {PodToRef(medPriorityPodInfo.Pod), PodToRef(unschedulablePodInfo.Pod)}, }, } if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { @@ -870,8 +870,8 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { unschedulablePodInfo.Pod.UID: "node1", highPriNominatedPodInfo.Pod.UID: "node1", }, - nominatedPods: map[string][]*framework.PodInfo{ - "node1": {highPriNominatedPodInfo, unschedulablePodInfo}, + nominatedPods: map[string][]PodRef{ + "node1": {PodToRef(highPriNominatedPodInfo.Pod), PodToRef(unschedulablePodInfo.Pod)}, }, } if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { @@ -2178,10 +2178,10 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { highPriorityPodInfo.Pod.UID: "node2", unschedulablePodInfo.Pod.UID: "node5", }, - nominatedPods: map[string][]*framework.PodInfo{ - "node1": {medPriorityPodInfo}, - "node2": {highPriorityPodInfo}, - "node5": {unschedulablePodInfo}, + nominatedPods: map[string][]PodRef{ + "node1": {PodToRef(medPriorityPodInfo.Pod)}, + "node2": {PodToRef(highPriorityPodInfo.Pod)}, + "node5": {PodToRef(unschedulablePodInfo.Pod)}, }, } if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { @@ -2203,10 +2203,10 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { highPriorityPodInfo.Pod.UID: "node4", unschedulablePodInfo.Pod.UID: "node5", }, - nominatedPods: map[string][]*framework.PodInfo{ - "node1": {medPriorityPodInfo}, - "node4": {highPriorityPodInfo}, - "node5": {unschedulablePodInfo}, + nominatedPods: map[string][]PodRef{ + "node1": {PodToRef(medPriorityPodInfo.Pod)}, + "node4": {PodToRef(highPriorityPodInfo.Pod)}, + "node5": {PodToRef(unschedulablePodInfo.Pod)}, }, } if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { @@ -2236,9 +2236,9 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { medPriorityPodInfo.Pod.UID: "node1", unschedulablePodInfo.Pod.UID: "node5", }, - nominatedPods: map[string][]*framework.PodInfo{ - "node1": {medPriorityPodInfo}, - "node5": {unschedulablePodInfo}, + nominatedPods: map[string][]PodRef{ + "node1": {PodToRef(medPriorityPodInfo.Pod)}, + "node5": {PodToRef(unschedulablePodInfo.Pod)}, }, } if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { diff --git a/pkg/scheduler/internal/queue/testing.go b/pkg/scheduler/internal/queue/testing.go index e0ee28312e2..e8ef51acee5 100644 --- a/pkg/scheduler/internal/queue/testing.go +++ b/pkg/scheduler/internal/queue/testing.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + listersv1 "k8s.io/client-go/listers/core/v1" "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -53,3 +54,17 @@ func NewTestQueueWithInformerFactory( informerFactory.WaitForCacheSync(ctx.Done()) return pq } + +// NewPodNominator creates a nominator as a backing of framework.PodNominator. +// A podLister is passed in so as to check if the pod exists +// before adding its nominatedNode info. +func NewTestPodNominator(podLister listersv1.PodLister) framework.PodNominator { + nominatedPodsToInfo := func(nominatedPods []PodRef) []*framework.PodInfo { + pods := make([]*framework.PodInfo, len(nominatedPods)) + for i, np := range nominatedPods { + pods[i] = &framework.PodInfo{Pod: np.ToPod()} + } + return pods + } + return newPodNominator(podLister, nominatedPodsToInfo) +} diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index 2af9ab6e32d..675c7d7fee6 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -2483,7 +2483,7 @@ func TestSchedulerSchedulePod(t *testing.T) { test.registerPlugins, "", frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), ) if err != nil { t.Fatal(err) @@ -2546,7 +2546,7 @@ func TestFindFitAllError(t *testing.T) { tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), + frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)), ) if err != nil { t.Fatal(err) @@ -2586,7 +2586,7 @@ func TestFindFitSomeError(t *testing.T) { tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), + frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)), ) if err != nil { t.Fatal(err) @@ -2663,7 +2663,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) { fwk, err := tf.NewFramework( ctx, registerPlugins, "", - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), + frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)), ) if err != nil { t.Fatal(err) @@ -2804,7 +2804,7 @@ func TestZeroRequest(t *testing.T) { frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithClientSet(client), - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), ) if err != nil { t.Fatalf("error creating framework: %+v", err) @@ -3207,7 +3207,7 @@ func Test_prioritizeNodes(t *testing.T) { frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithClientSet(client), - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), ) if err != nil { t.Fatalf("error creating framework: %+v", err) @@ -3325,7 +3325,7 @@ func TestFairEvaluationForNodes(t *testing.T) { tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), + frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)), ) if err != nil { t.Fatal(err) @@ -3407,7 +3407,7 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) { ctx, registerPlugins, "", frameworkruntime.WithClientSet(client), - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), ) if err != nil { t.Fatal(err) @@ -3565,7 +3565,7 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(recorder), frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithWaitingPods(waitingPods), )