diff --git a/pkg/scheduler/extender_test.go b/pkg/scheduler/extender_test.go index 8e9db9953e3..c474087757c 100644 --- a/pkg/scheduler/extender_test.go +++ b/pkg/scheduler/extender_test.go @@ -337,7 +337,7 @@ func TestSchedulerWithExtenders(t *testing.T) { test.registerPlugins, "", runtime.WithClientSet(client), runtime.WithInformerFactory(informerFactory), - runtime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + runtime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), runtime.WithLogger(logger), ) if err != nil { diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index 2512d659601..7563085e694 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -380,7 +380,7 @@ func TestPostFilter(t *testing.T) { frameworkruntime.WithClientSet(cs), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), frameworkruntime.WithExtenders(extenders), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)), frameworkruntime.WithLogger(logger), @@ -1117,7 +1117,7 @@ func TestDryRunPreemption(t *testing.T) { fwk, err := tf.NewFramework( ctx, registeredPlugins, "", - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithParallelism(parallelism), @@ -1376,7 +1376,7 @@ func TestSelectBestCandidate(t *testing.T) { tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithLogger(logger), ) @@ -1760,7 +1760,7 @@ func TestPreempt(t *testing.T) { frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), frameworkruntime.WithExtenders(extenders), - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithWaitingPods(waitingPods), diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index 25647d2ff78..04d8c92c50c 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -208,7 +208,7 @@ func TestDryRunPreemption(t *testing.T) { fwk, err := tf.NewFramework( ctx, registeredPlugins, "", - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithParallelism(parallelism), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, tt.nodes)), @@ -313,7 +313,7 @@ func TestSelectCandidate(t *testing.T) { ctx, registeredPlugins, "", - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithLogger(logger), ) diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index 94021bf2b9f..9208f8bd575 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -31,6 +31,8 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/informers" + clientsetfake "k8s.io/client-go/kubernetes/fake" "k8s.io/component-base/metrics/testutil" "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -2385,7 +2387,20 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) { ) } - podNominator := internalqueue.NewTestPodNominator(nil) + informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewClientset(), 0) + podInformer := informerFactory.Core().V1().Pods().Informer() + err := podInformer.GetStore().Add(tt.pod) + if err != nil { + t.Fatalf("Error adding pod to podInformer: %s", err) + } + if tt.nominatedPod != nil { + err = podInformer.GetStore().Add(tt.nominatedPod) + if err != nil { + t.Fatalf("Error adding nominated pod to podInformer: %s", err) + } + } + + podNominator := internalqueue.NewSchedulingQueue(nil, informerFactory) if tt.nominatedPod != nil { podNominator.AddNominatedPod( logger, diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 60309da1e30..2a5b530d0d0 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -349,7 +349,7 @@ func NewPriorityQueue( } pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) pq.nsLister = informerFactory.Core().V1().Namespaces().Lister() - pq.nominator = newPodNominator(options.podLister, pq.nominatedPodsToInfo) + pq.nominator = newPodNominator(options.podLister) return pq } @@ -1203,18 +1203,6 @@ func (p *PriorityQueue) nominatedPodToInfo(np PodRef) *framework.PodInfo { return &framework.PodInfo{Pod: pod} } -func (p *PriorityQueue) nominatedPodsToInfo(nominatedPods []PodRef) []*framework.PodInfo { - p.lock.RLock() - defer p.lock.RUnlock() - p.activeQ.getLock().RLock() - defer p.activeQ.getLock().RUnlock() - pods := make([]*framework.PodInfo, len(nominatedPods)) - for i, np := range nominatedPods { - pods[i] = p.nominatedPodToInfo(np).DeepCopy() - } - return pods -} - // Close closes the priority queue. func (p *PriorityQueue) Close() { p.lock.Lock() @@ -1241,15 +1229,27 @@ func (npm *nominator) AddNominatedPod(logger klog.Logger, pi *framework.PodInfo, npm.nLock.Unlock() } +func (npm *nominator) nominatedPodsForNode(nodeName string) []PodRef { + npm.nLock.RLock() + defer npm.nLock.RUnlock() + return slices.Clone(npm.nominatedPods[nodeName]) +} + // NominatedPodsForNode returns a copy of pods that are nominated to run on the given node, // but they are waiting for other pods to be removed from the node. -// CAUTION: Make sure you don't call this function while taking any lock in any scenario. -func (npm *nominator) NominatedPodsForNode(nodeName string) []*framework.PodInfo { - npm.nLock.RLock() - nominatedPods := slices.Clone(npm.nominatedPods[nodeName]) - npm.nLock.RUnlock() - // Note that nominatedPodsToInfo takes SchedulingQueue.lock inside. - return npm.nominatedPodsToInfo(nominatedPods) +// CAUTION: Make sure you don't call this function while taking any queue's lock in any scenario. +func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*framework.PodInfo { + p.lock.RLock() + defer p.lock.RUnlock() + nominatedPods := p.nominator.nominatedPodsForNode(nodeName) + + p.activeQ.getLock().RLock() + defer p.activeQ.getLock().RUnlock() + pods := make([]*framework.PodInfo, len(nominatedPods)) + for i, np := range nominatedPods { + pods[i] = p.nominatedPodToInfo(np).DeepCopy() + } + return pods } func (p *PriorityQueue) podsCompareBackoffCompleted(pInfo1, pInfo2 *framework.QueuedPodInfo) bool { @@ -1495,12 +1495,11 @@ func (npm *nominator) UpdateNominatedPod(logger klog.Logger, oldPod *v1.Pod, new npm.addNominatedPodUnlocked(logger, newPodInfo, nominatingInfo) } -func newPodNominator(podLister listersv1.PodLister, nominatedPodsToInfo func([]PodRef) []*framework.PodInfo) *nominator { +func newPodNominator(podLister listersv1.PodLister) *nominator { return &nominator{ - podLister: podLister, - nominatedPods: make(map[string][]PodRef), - nominatedPodToNode: make(map[types.UID]string), - nominatedPodsToInfo: nominatedPodsToInfo, + podLister: podLister, + nominatedPods: make(map[string][]PodRef), + nominatedPodToNode: make(map[types.UID]string), } } diff --git a/pkg/scheduler/internal/queue/testing.go b/pkg/scheduler/internal/queue/testing.go index 0e6a138d89b..e620fc1cab9 100644 --- a/pkg/scheduler/internal/queue/testing.go +++ b/pkg/scheduler/internal/queue/testing.go @@ -22,7 +22,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" - listersv1 "k8s.io/client-go/listers/core/v1" "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -54,17 +53,3 @@ func NewTestQueueWithInformerFactory( informerFactory.WaitForCacheSync(ctx.Done()) return pq } - -// NewPodNominator creates a nominator as a backing of framework.PodNominator. -// A podLister is passed in so as to check if the pod exists -// before adding its nominatedNode info. -func NewTestPodNominator(podLister listersv1.PodLister) framework.PodNominator { - nominatedPodsToInfo := func(nominatedPods []PodRef) []*framework.PodInfo { - pods := make([]*framework.PodInfo, len(nominatedPods)) - for i, np := range nominatedPods { - pods[i] = &framework.PodInfo{Pod: np.ToPod()} - } - return pods - } - return newPodNominator(podLister, nominatedPodsToInfo) -} diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index 9d924b5c0a2..a762fba3d92 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -793,13 +793,14 @@ func TestSchedulerScheduleOne(t *testing.T) { t.Fatal(err) } + informerFactory := informers.NewSharedInformerFactory(client, 0) sched := &Scheduler{ Cache: cache, client: client, NextPod: func(logger klog.Logger) (*framework.QueuedPodInfo, error) { return &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, item.sendPod)}, nil }, - SchedulingQueue: internalqueue.NewTestQueue(ctx, nil), + SchedulingQueue: internalqueue.NewSchedulingQueue(nil, informerFactory), Profiles: profile.Map{testSchedulerName: fwk}, } @@ -2472,7 +2473,7 @@ func TestSchedulerSchedulePod(t *testing.T) { test.registerPlugins, "", frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), ) if err != nil { t.Fatal(err) @@ -2538,7 +2539,7 @@ func TestFindFitAllError(t *testing.T) { tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)), + frameworkruntime.WithPodNominator(internalqueue.NewTestQueue(ctx, nil)), ) if err != nil { t.Fatal(err) @@ -2581,7 +2582,7 @@ func TestFindFitSomeError(t *testing.T) { tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)), + frameworkruntime.WithPodNominator(internalqueue.NewTestQueue(ctx, nil)), ) if err != nil { t.Fatal(err) @@ -2652,10 +2653,18 @@ func TestFindFitPredicateCallCounts(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() + + informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewClientset(), 0) + podInformer := informerFactory.Core().V1().Pods().Informer() + err := podInformer.GetStore().Add(test.pod) + if err != nil { + t.Fatalf("Error adding pod to podInformer: %s", err) + } + fwk, err := tf.NewFramework( ctx, registerPlugins, "", - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), ) if err != nil { t.Fatal(err) @@ -2669,6 +2678,10 @@ func TestFindFitPredicateCallCounts(t *testing.T) { if err != nil { t.Fatal(err) } + err = podInformer.GetStore().Add(podinfo.Pod) + if err != nil { + t.Fatalf("Error adding nominated pod to podInformer: %s", err) + } fwk.AddNominatedPod(logger, podinfo, &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "1"}) _, _, err = scheduler.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), test.pod) @@ -2796,7 +2809,7 @@ func TestZeroRequest(t *testing.T) { frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithClientSet(client), - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), ) if err != nil { t.Fatalf("error creating framework: %+v", err) @@ -3199,7 +3212,7 @@ func Test_prioritizeNodes(t *testing.T) { frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithClientSet(client), - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), ) if err != nil { t.Fatalf("error creating framework: %+v", err) @@ -3317,7 +3330,7 @@ func TestFairEvaluationForNodes(t *testing.T) { tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)), + frameworkruntime.WithPodNominator(internalqueue.NewTestQueue(ctx, nil)), ) if err != nil { t.Fatal(err) @@ -3399,7 +3412,7 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) { ctx, registerPlugins, "", frameworkruntime.WithClientSet(client), - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), ) if err != nil { t.Fatal(err) @@ -3557,7 +3570,7 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(recorder), frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithPodNominator(schedulingQueue), frameworkruntime.WithWaitingPods(waitingPods), )