diff --git a/pkg/scheduler/extender_test.go b/pkg/scheduler/extender_test.go index 8e9db9953e3..c474087757c 100644 --- a/pkg/scheduler/extender_test.go +++ b/pkg/scheduler/extender_test.go @@ -337,7 +337,7 @@ func TestSchedulerWithExtenders(t *testing.T) { test.registerPlugins, "", runtime.WithClientSet(client), runtime.WithInformerFactory(informerFactory), - runtime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + runtime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), runtime.WithLogger(logger), ) if err != nil { diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index 2512d659601..7563085e694 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -380,7 +380,7 @@ func TestPostFilter(t *testing.T) { frameworkruntime.WithClientSet(cs), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), frameworkruntime.WithExtenders(extenders), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)), frameworkruntime.WithLogger(logger), @@ -1117,7 +1117,7 @@ func TestDryRunPreemption(t *testing.T) { fwk, err := tf.NewFramework( ctx, registeredPlugins, "", - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithParallelism(parallelism), @@ -1376,7 +1376,7 @@ func TestSelectBestCandidate(t *testing.T) { tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithLogger(logger), ) @@ -1760,7 +1760,7 @@ func TestPreempt(t *testing.T) { frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), frameworkruntime.WithExtenders(extenders), - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithWaitingPods(waitingPods), diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index 25647d2ff78..04d8c92c50c 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -208,7 +208,7 @@ func TestDryRunPreemption(t *testing.T) { fwk, err := tf.NewFramework( ctx, registeredPlugins, "", - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithParallelism(parallelism), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, tt.nodes)), @@ -313,7 +313,7 @@ func TestSelectCandidate(t *testing.T) { ctx, registeredPlugins, "", - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithLogger(logger), ) diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index 94021bf2b9f..9208f8bd575 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -31,6 +31,8 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/informers" + clientsetfake "k8s.io/client-go/kubernetes/fake" "k8s.io/component-base/metrics/testutil" "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -2385,7 +2387,20 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) { ) } - podNominator := internalqueue.NewTestPodNominator(nil) + informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewClientset(), 0) + podInformer := informerFactory.Core().V1().Pods().Informer() + err := podInformer.GetStore().Add(tt.pod) + if err != nil { + t.Fatalf("Error adding pod to podInformer: %s", err) + } + if tt.nominatedPod != nil { + err = podInformer.GetStore().Add(tt.nominatedPod) + if err != nil { + t.Fatalf("Error adding nominated pod to podInformer: %s", err) + } + } + + podNominator := internalqueue.NewSchedulingQueue(nil, informerFactory) if tt.nominatedPod != nil { podNominator.AddNominatedPod( logger, diff --git a/pkg/scheduler/internal/queue/nominator.go b/pkg/scheduler/internal/queue/nominator.go new file mode 100644 index 00000000000..59f7eaa4802 --- /dev/null +++ b/pkg/scheduler/internal/queue/nominator.go @@ -0,0 +1,195 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "slices" + "sync" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + listersv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +// nominator is a structure that stores pods nominated to run on nodes. +// It exists because nominatedNodeName of pod objects stored in the structure +// may be different than what scheduler has here. We should be able to find pods +// by their UID and update/delete them. +type nominator struct { + // nLock synchronizes all operations related to nominator. + // It should not be used anywhere else. + // Caution: DO NOT take ("SchedulingQueue.lock" or "activeQueue.lock") after taking "nLock". + // You should always take "SchedulingQueue.lock" and "activeQueue.lock" first, + // otherwise the nominator could end up in deadlock. + // Correct locking order is: SchedulingQueue.lock > activeQueue.lock > nLock. + nLock sync.RWMutex + + // podLister is used to verify if the given pod is alive. + podLister listersv1.PodLister + // nominatedPods is a map keyed by a node name and the value is a list of + // pods which are nominated to run on the node. These are pods which can be in + // the activeQ or unschedulablePods. + nominatedPods map[string][]podRef + // nominatedPodToNode is map keyed by a Pod UID to the node name where it is + // nominated. + nominatedPodToNode map[types.UID]string +} + +func newPodNominator(podLister listersv1.PodLister) *nominator { + return &nominator{ + podLister: podLister, + nominatedPods: make(map[string][]podRef), + nominatedPodToNode: make(map[types.UID]string), + } +} + +// AddNominatedPod adds a pod to the nominated pods of the given node. +// This is called during the preemption process after a node is nominated to run +// the pod. We update the structure before sending a request to update the pod +// object to avoid races with the following scheduling cycles. +func (npm *nominator) AddNominatedPod(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) { + npm.nLock.Lock() + npm.addNominatedPodUnlocked(logger, pi, nominatingInfo) + npm.nLock.Unlock() +} + +func (npm *nominator) addNominatedPodUnlocked(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) { + // Always delete the pod if it already exists, to ensure we never store more than + // one instance of the pod. + npm.deleteUnlocked(pi.Pod) + + var nodeName string + if nominatingInfo.Mode() == framework.ModeOverride { + nodeName = nominatingInfo.NominatedNodeName + } else if nominatingInfo.Mode() == framework.ModeNoop { + if pi.Pod.Status.NominatedNodeName == "" { + return + } + nodeName = pi.Pod.Status.NominatedNodeName + } + + if npm.podLister != nil { + // If the pod was removed or if it was already scheduled, don't nominate it. + updatedPod, err := npm.podLister.Pods(pi.Pod.Namespace).Get(pi.Pod.Name) + if err != nil { + logger.V(4).Info("Pod doesn't exist in podLister, aborted adding it to the nominator", "pod", klog.KObj(pi.Pod)) + return + } + if updatedPod.Spec.NodeName != "" { + logger.V(4).Info("Pod is already scheduled to a node, aborted adding it to the nominator", "pod", klog.KObj(pi.Pod), "node", updatedPod.Spec.NodeName) + return + } + } + + npm.nominatedPodToNode[pi.Pod.UID] = nodeName + for _, np := range npm.nominatedPods[nodeName] { + if np.uid == pi.Pod.UID { + logger.V(4).Info("Pod already exists in the nominator", "pod", np.uid) + return + } + } + npm.nominatedPods[nodeName] = append(npm.nominatedPods[nodeName], podToRef(pi.Pod)) +} + +// UpdateNominatedPod updates the with . +func (npm *nominator) UpdateNominatedPod(logger klog.Logger, oldPod *v1.Pod, newPodInfo *framework.PodInfo) { + npm.nLock.Lock() + defer npm.nLock.Unlock() + // In some cases, an Update event with no "NominatedNode" present is received right + // after a node("NominatedNode") is reserved for this pod in memory. + // In this case, we need to keep reserving the NominatedNode when updating the pod pointer. + var nominatingInfo *framework.NominatingInfo + // We won't fall into below `if` block if the Update event represents: + // (1) NominatedNode info is added + // (2) NominatedNode info is updated + // (3) NominatedNode info is removed + if nominatedNodeName(oldPod) == "" && nominatedNodeName(newPodInfo.Pod) == "" { + if nnn, ok := npm.nominatedPodToNode[oldPod.UID]; ok { + // This is the only case we should continue reserving the NominatedNode + nominatingInfo = &framework.NominatingInfo{ + NominatingMode: framework.ModeOverride, + NominatedNodeName: nnn, + } + } + } + // We update irrespective of the nominatedNodeName changed or not, to ensure + // that pod pointer is updated. + npm.deleteUnlocked(oldPod) + npm.addNominatedPodUnlocked(logger, newPodInfo, nominatingInfo) +} + +// DeleteNominatedPodIfExists deletes from nominatedPods. +func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) { + npm.nLock.Lock() + npm.deleteUnlocked(pod) + npm.nLock.Unlock() +} + +func (npm *nominator) deleteUnlocked(p *v1.Pod) { + nnn, ok := npm.nominatedPodToNode[p.UID] + if !ok { + return + } + for i, np := range npm.nominatedPods[nnn] { + if np.uid == p.UID { + npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...) + if len(npm.nominatedPods[nnn]) == 0 { + delete(npm.nominatedPods, nnn) + } + break + } + } + delete(npm.nominatedPodToNode, p.UID) +} + +func (npm *nominator) nominatedPodsForNode(nodeName string) []podRef { + npm.nLock.RLock() + defer npm.nLock.RUnlock() + return slices.Clone(npm.nominatedPods[nodeName]) +} + +// nominatedNodeName returns nominated node name of a Pod. +func nominatedNodeName(pod *v1.Pod) string { + return pod.Status.NominatedNodeName +} + +type podRef struct { + name string + namespace string + uid types.UID +} + +func podToRef(pod *v1.Pod) podRef { + return podRef{ + name: pod.Name, + namespace: pod.Namespace, + uid: pod.UID, + } +} + +func (np podRef) toPod() *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: np.name, + Namespace: np.namespace, + UID: np.uid, + }, + } +} diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 60309da1e30..b4fb251f742 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -31,12 +31,10 @@ import ( "fmt" "math/rand" "reflect" - "slices" "sync" "time" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -137,11 +135,6 @@ func NewSchedulingQueue( return NewPriorityQueue(lessFn, informerFactory, opts...) } -// NominatedNodeName returns nominated node name of a Pod. -func NominatedNodeName(pod *v1.Pod) string { - return pod.Status.NominatedNodeName -} - // PriorityQueue implements a scheduling queue. // The head of PriorityQueue is the highest priority pending pod. This structure // has two sub queues and a additional data structure, namely: activeQ, @@ -349,7 +342,7 @@ func NewPriorityQueue( } pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) pq.nsLister = informerFactory.Core().V1().Namespaces().Lister() - pq.nominator = newPodNominator(options.podLister, pq.nominatedPodsToInfo) + pq.nominator = newPodNominator(options.podLister) return pq } @@ -1181,8 +1174,8 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) { } // Note: this function assumes the caller locks both p.lock.RLock and p.activeQ.getLock().RLock. -func (p *PriorityQueue) nominatedPodToInfo(np PodRef) *framework.PodInfo { - pod := np.ToPod() +func (p *PriorityQueue) nominatedPodToInfo(np podRef) *framework.PodInfo { + pod := np.toPod() pInfoLookup := newQueuedPodInfoForLookup(pod) queuedPodInfo, exists := p.activeQ.unlocked().Get(pInfoLookup) @@ -1203,18 +1196,6 @@ func (p *PriorityQueue) nominatedPodToInfo(np PodRef) *framework.PodInfo { return &framework.PodInfo{Pod: pod} } -func (p *PriorityQueue) nominatedPodsToInfo(nominatedPods []PodRef) []*framework.PodInfo { - p.lock.RLock() - defer p.lock.RUnlock() - p.activeQ.getLock().RLock() - defer p.activeQ.getLock().RUnlock() - pods := make([]*framework.PodInfo, len(nominatedPods)) - for i, np := range nominatedPods { - pods[i] = p.nominatedPodToInfo(np).DeepCopy() - } - return pods -} - // Close closes the priority queue. func (p *PriorityQueue) Close() { p.lock.Lock() @@ -1224,32 +1205,21 @@ func (p *PriorityQueue) Close() { p.activeQ.broadcast() } -// DeleteNominatedPodIfExists deletes from nominatedPods. -func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) { - npm.nLock.Lock() - npm.delete(pod) - npm.nLock.Unlock() -} - -// AddNominatedPod adds a pod to the nominated pods of the given node. -// This is called during the preemption process after a node is nominated to run -// the pod. We update the structure before sending a request to update the pod -// object to avoid races with the following scheduling cycles. -func (npm *nominator) AddNominatedPod(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) { - npm.nLock.Lock() - npm.addNominatedPodUnlocked(logger, pi, nominatingInfo) - npm.nLock.Unlock() -} - // NominatedPodsForNode returns a copy of pods that are nominated to run on the given node, // but they are waiting for other pods to be removed from the node. -// CAUTION: Make sure you don't call this function while taking any lock in any scenario. -func (npm *nominator) NominatedPodsForNode(nodeName string) []*framework.PodInfo { - npm.nLock.RLock() - nominatedPods := slices.Clone(npm.nominatedPods[nodeName]) - npm.nLock.RUnlock() - // Note that nominatedPodsToInfo takes SchedulingQueue.lock inside. - return npm.nominatedPodsToInfo(nominatedPods) +// CAUTION: Make sure you don't call this function while taking any queue's lock in any scenario. +func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*framework.PodInfo { + p.lock.RLock() + defer p.lock.RUnlock() + nominatedPods := p.nominator.nominatedPodsForNode(nodeName) + + p.activeQ.getLock().RLock() + defer p.activeQ.getLock().RUnlock() + pods := make([]*framework.PodInfo, len(nominatedPods)) + for i, np := range nominatedPods { + pods[i] = p.nominatedPodToInfo(np).DeepCopy() + } + return pods } func (p *PriorityQueue) podsCompareBackoffCompleted(pInfo1, pInfo2 *framework.QueuedPodInfo) bool { @@ -1362,148 +1332,6 @@ func newUnschedulablePods(unschedulableRecorder, gatedRecorder metrics.MetricRec } } -type PodRef struct { - Name string - Namespace string - UID types.UID -} - -func PodToRef(pod *v1.Pod) PodRef { - return PodRef{ - Name: pod.Name, - Namespace: pod.Namespace, - UID: pod.UID, - } -} - -func (np PodRef) ToPod() *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: np.Name, - Namespace: np.Namespace, - UID: np.UID, - }, - } -} - -// nominator is a structure that stores pods nominated to run on nodes. -// It exists because nominatedNodeName of pod objects stored in the structure -// may be different than what scheduler has here. We should be able to find pods -// by their UID and update/delete them. -type nominator struct { - // nLock synchronizes all operations related to nominator. - // Caution: DO NOT take ("SchedulingQueue.lock" or "activeQueue.lock") after taking "nLock". - // You should always take "SchedulingQueue.lock" and "activeQueue.lock" first, - // otherwise the nominator could end up in deadlock. - // Correct locking order is: SchedulingQueue.lock > activeQueue.lock > nLock. - nLock sync.RWMutex - - // podLister is used to verify if the given pod is alive. - podLister listersv1.PodLister - // nominatedPods is a map keyed by a node name and the value is a list of - // pods which are nominated to run on the node. These are pods which can be in - // the activeQ or unschedulablePods. - nominatedPods map[string][]PodRef - // nominatedPodToNode is map keyed by a Pod UID to the node name where it is - // nominated. - nominatedPodToNode map[types.UID]string - // nominatedPodsToInfo returns PodInfos cached in the queues for nominated PodRefs. - // Note: it takes SchedulingQueue.lock inside. - // Make sure you don't call this function while taking any lock in any scenario. - nominatedPodsToInfo func([]PodRef) []*framework.PodInfo -} - -func (npm *nominator) addNominatedPodUnlocked(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) { - // Always delete the pod if it already exists, to ensure we never store more than - // one instance of the pod. - npm.delete(pi.Pod) - - var nodeName string - if nominatingInfo.Mode() == framework.ModeOverride { - nodeName = nominatingInfo.NominatedNodeName - } else if nominatingInfo.Mode() == framework.ModeNoop { - if pi.Pod.Status.NominatedNodeName == "" { - return - } - nodeName = pi.Pod.Status.NominatedNodeName - } - - if npm.podLister != nil { - // If the pod was removed or if it was already scheduled, don't nominate it. - updatedPod, err := npm.podLister.Pods(pi.Pod.Namespace).Get(pi.Pod.Name) - if err != nil { - logger.V(4).Info("Pod doesn't exist in podLister, aborted adding it to the nominator", "pod", klog.KObj(pi.Pod)) - return - } - if updatedPod.Spec.NodeName != "" { - logger.V(4).Info("Pod is already scheduled to a node, aborted adding it to the nominator", "pod", klog.KObj(pi.Pod), "node", updatedPod.Spec.NodeName) - return - } - } - - npm.nominatedPodToNode[pi.Pod.UID] = nodeName - for _, np := range npm.nominatedPods[nodeName] { - if np.UID == pi.Pod.UID { - logger.V(4).Info("Pod already exists in the nominator", "pod", np.UID) - return - } - } - npm.nominatedPods[nodeName] = append(npm.nominatedPods[nodeName], PodToRef(pi.Pod)) -} - -func (npm *nominator) delete(p *v1.Pod) { - nnn, ok := npm.nominatedPodToNode[p.UID] - if !ok { - return - } - for i, np := range npm.nominatedPods[nnn] { - if np.UID == p.UID { - npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...) - if len(npm.nominatedPods[nnn]) == 0 { - delete(npm.nominatedPods, nnn) - } - break - } - } - delete(npm.nominatedPodToNode, p.UID) -} - -// UpdateNominatedPod updates the with . -func (npm *nominator) UpdateNominatedPod(logger klog.Logger, oldPod *v1.Pod, newPodInfo *framework.PodInfo) { - npm.nLock.Lock() - defer npm.nLock.Unlock() - // In some cases, an Update event with no "NominatedNode" present is received right - // after a node("NominatedNode") is reserved for this pod in memory. - // In this case, we need to keep reserving the NominatedNode when updating the pod pointer. - var nominatingInfo *framework.NominatingInfo - // We won't fall into below `if` block if the Update event represents: - // (1) NominatedNode info is added - // (2) NominatedNode info is updated - // (3) NominatedNode info is removed - if NominatedNodeName(oldPod) == "" && NominatedNodeName(newPodInfo.Pod) == "" { - if nnn, ok := npm.nominatedPodToNode[oldPod.UID]; ok { - // This is the only case we should continue reserving the NominatedNode - nominatingInfo = &framework.NominatingInfo{ - NominatingMode: framework.ModeOverride, - NominatedNodeName: nnn, - } - } - } - // We update irrespective of the nominatedNodeName changed or not, to ensure - // that pod pointer is updated. - npm.delete(oldPod) - npm.addNominatedPodUnlocked(logger, newPodInfo, nominatingInfo) -} - -func newPodNominator(podLister listersv1.PodLister, nominatedPodsToInfo func([]PodRef) []*framework.PodInfo) *nominator { - return &nominator{ - podLister: podLister, - nominatedPods: make(map[string][]PodRef), - nominatedPodToNode: make(map[types.UID]string), - nominatedPodsToInfo: nominatedPodsToInfo, - } -} - func podInfoKeyFunc(pInfo *framework.QueuedPodInfo) string { return cache.NewObjectName(pInfo.Pod.Namespace, pInfo.Pod.Name).String() } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index e78be3d2163..9a6e4cc1e9b 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -83,8 +83,8 @@ var ( ) nominatorCmpOpts = []cmp.Option{ - cmp.AllowUnexported(nominator{}), - cmpopts.IgnoreFields(nominator{}, "podLister", "nLock", "nominatedPodsToInfo"), + cmp.AllowUnexported(nominator{}, podRef{}), + cmpopts.IgnoreFields(nominator{}, "podLister", "nLock"), } queueHintReturnQueue = func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { @@ -129,8 +129,8 @@ func TestPriorityQueue_Add(t *testing.T) { medPriorityPodInfo.Pod.UID: "node1", unschedulablePodInfo.Pod.UID: "node1", }, - nominatedPods: map[string][]PodRef{ - "node1": {PodToRef(medPriorityPodInfo.Pod), PodToRef(unschedulablePodInfo.Pod)}, + nominatedPods: map[string][]podRef{ + "node1": {podToRef(medPriorityPodInfo.Pod), podToRef(unschedulablePodInfo.Pod)}, }, } if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { @@ -2118,10 +2118,10 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { highPriorityPodInfo.Pod.UID: "node2", unschedulablePodInfo.Pod.UID: "node5", }, - nominatedPods: map[string][]PodRef{ - "node1": {PodToRef(medPriorityPodInfo.Pod)}, - "node2": {PodToRef(highPriorityPodInfo.Pod)}, - "node5": {PodToRef(unschedulablePodInfo.Pod)}, + nominatedPods: map[string][]podRef{ + "node1": {podToRef(medPriorityPodInfo.Pod)}, + "node2": {podToRef(highPriorityPodInfo.Pod)}, + "node5": {podToRef(unschedulablePodInfo.Pod)}, }, } if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { @@ -2143,10 +2143,10 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { highPriorityPodInfo.Pod.UID: "node4", unschedulablePodInfo.Pod.UID: "node5", }, - nominatedPods: map[string][]PodRef{ - "node1": {PodToRef(medPriorityPodInfo.Pod)}, - "node4": {PodToRef(highPriorityPodInfo.Pod)}, - "node5": {PodToRef(unschedulablePodInfo.Pod)}, + nominatedPods: map[string][]podRef{ + "node1": {podToRef(medPriorityPodInfo.Pod)}, + "node4": {podToRef(highPriorityPodInfo.Pod)}, + "node5": {podToRef(unschedulablePodInfo.Pod)}, }, } if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { @@ -2176,9 +2176,9 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { medPriorityPodInfo.Pod.UID: "node1", unschedulablePodInfo.Pod.UID: "node5", }, - nominatedPods: map[string][]PodRef{ - "node1": {PodToRef(medPriorityPodInfo.Pod)}, - "node5": {PodToRef(unschedulablePodInfo.Pod)}, + nominatedPods: map[string][]podRef{ + "node1": {podToRef(medPriorityPodInfo.Pod)}, + "node5": {podToRef(unschedulablePodInfo.Pod)}, }, } if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { diff --git a/pkg/scheduler/internal/queue/testing.go b/pkg/scheduler/internal/queue/testing.go index 0e6a138d89b..e620fc1cab9 100644 --- a/pkg/scheduler/internal/queue/testing.go +++ b/pkg/scheduler/internal/queue/testing.go @@ -22,7 +22,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" - listersv1 "k8s.io/client-go/listers/core/v1" "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -54,17 +53,3 @@ func NewTestQueueWithInformerFactory( informerFactory.WaitForCacheSync(ctx.Done()) return pq } - -// NewPodNominator creates a nominator as a backing of framework.PodNominator. -// A podLister is passed in so as to check if the pod exists -// before adding its nominatedNode info. -func NewTestPodNominator(podLister listersv1.PodLister) framework.PodNominator { - nominatedPodsToInfo := func(nominatedPods []PodRef) []*framework.PodInfo { - pods := make([]*framework.PodInfo, len(nominatedPods)) - for i, np := range nominatedPods { - pods[i] = &framework.PodInfo{Pod: np.ToPod()} - } - return pods - } - return newPodNominator(podLister, nominatedPodsToInfo) -} diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index 9d924b5c0a2..a762fba3d92 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -793,13 +793,14 @@ func TestSchedulerScheduleOne(t *testing.T) { t.Fatal(err) } + informerFactory := informers.NewSharedInformerFactory(client, 0) sched := &Scheduler{ Cache: cache, client: client, NextPod: func(logger klog.Logger) (*framework.QueuedPodInfo, error) { return &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, item.sendPod)}, nil }, - SchedulingQueue: internalqueue.NewTestQueue(ctx, nil), + SchedulingQueue: internalqueue.NewSchedulingQueue(nil, informerFactory), Profiles: profile.Map{testSchedulerName: fwk}, } @@ -2472,7 +2473,7 @@ func TestSchedulerSchedulePod(t *testing.T) { test.registerPlugins, "", frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), ) if err != nil { t.Fatal(err) @@ -2538,7 +2539,7 @@ func TestFindFitAllError(t *testing.T) { tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)), + frameworkruntime.WithPodNominator(internalqueue.NewTestQueue(ctx, nil)), ) if err != nil { t.Fatal(err) @@ -2581,7 +2582,7 @@ func TestFindFitSomeError(t *testing.T) { tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)), + frameworkruntime.WithPodNominator(internalqueue.NewTestQueue(ctx, nil)), ) if err != nil { t.Fatal(err) @@ -2652,10 +2653,18 @@ func TestFindFitPredicateCallCounts(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() + + informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewClientset(), 0) + podInformer := informerFactory.Core().V1().Pods().Informer() + err := podInformer.GetStore().Add(test.pod) + if err != nil { + t.Fatalf("Error adding pod to podInformer: %s", err) + } + fwk, err := tf.NewFramework( ctx, registerPlugins, "", - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), ) if err != nil { t.Fatal(err) @@ -2669,6 +2678,10 @@ func TestFindFitPredicateCallCounts(t *testing.T) { if err != nil { t.Fatal(err) } + err = podInformer.GetStore().Add(podinfo.Pod) + if err != nil { + t.Fatalf("Error adding nominated pod to podInformer: %s", err) + } fwk.AddNominatedPod(logger, podinfo, &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "1"}) _, _, err = scheduler.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), test.pod) @@ -2796,7 +2809,7 @@ func TestZeroRequest(t *testing.T) { frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithClientSet(client), - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), ) if err != nil { t.Fatalf("error creating framework: %+v", err) @@ -3199,7 +3212,7 @@ func Test_prioritizeNodes(t *testing.T) { frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithClientSet(client), - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), ) if err != nil { t.Fatalf("error creating framework: %+v", err) @@ -3317,7 +3330,7 @@ func TestFairEvaluationForNodes(t *testing.T) { tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)), + frameworkruntime.WithPodNominator(internalqueue.NewTestQueue(ctx, nil)), ) if err != nil { t.Fatal(err) @@ -3399,7 +3412,7 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) { ctx, registerPlugins, "", frameworkruntime.WithClientSet(client), - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), ) if err != nil { t.Fatal(err) @@ -3557,7 +3570,7 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(recorder), frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(schedulingQueue), frameworkruntime.WithWaitingPods(waitingPods), )