diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index 913c04588a3..bdcf17393f5 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -271,7 +271,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { test.registerPlugins, "", runtime.WithClientSet(client), runtime.WithInformerFactory(informerFactory), - runtime.WithPodNominator(internalqueue.NewPodNominator()), + runtime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), ) if err != nil { t.Fatal(err) diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 54ea8ee247d..4336cacdb33 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -995,7 +995,7 @@ func TestGenericScheduler(t *testing.T) { test.registerPlugins, "", frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), + frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), ) if err != nil { t.Fatal(err) @@ -1056,7 +1056,7 @@ func TestFindFitAllError(t *testing.T) { st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), + frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), ) if err != nil { t.Fatal(err) @@ -1089,7 +1089,7 @@ func TestFindFitSomeError(t *testing.T) { st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), + frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), ) if err != nil { t.Fatal(err) @@ -1163,7 +1163,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) { } fwk, err := st.NewFramework( registerPlugins, "", - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), + frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), ) if err != nil { t.Fatal(err) @@ -1325,7 +1325,7 @@ func TestZeroRequest(t *testing.T) { frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithClientSet(client), - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), + frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), ) if err != nil { t.Fatalf("error creating framework: %+v", err) @@ -1427,7 +1427,7 @@ func TestFairEvaluationForNodes(t *testing.T) { st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), + frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), ) if err != nil { t.Fatal(err) @@ -1493,7 +1493,8 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PreferNominatedNode, test.feature)() // create three nodes in the cluster. nodes := makeNodeList([]string{"node1", "node2", "node3"}) - client := &clientsetfake.Clientset{} + client := clientsetfake.NewSimpleClientset(test.pod) + informerFactory := informers.NewSharedInformerFactory(client, 0) cache := internalcache.New(time.Duration(0), wait.NeverStop) for _, n := range nodes { cache.AddNode(n) @@ -1513,7 +1514,7 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) { fwk, err := st.NewFramework( registerPlugins, "", frameworkruntime.WithClientSet(client), - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), + frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), ) if err != nil { t.Fatal(err) diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index e0729389f13..8675b4f3047 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -138,7 +138,7 @@ func (c *Configurator) create() (*Scheduler, error) { } // The nominator will be passed all the way to framework instantiation. - nominator := internalqueue.NewSafePodNominator(c.informerFactory.Core().V1().Pods().Lister()) + nominator := internalqueue.NewPodNominator(c.informerFactory.Core().V1().Pods().Lister()) profiles, err := profile.NewMap(c.profiles, c.registry, c.recorderFactory, frameworkruntime.WithComponentConfigVersion(c.componentConfigVersion), frameworkruntime.WithClientSet(c.client), diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 467ba394db0..c654c23777d 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -585,7 +585,7 @@ type PostFilterResult struct { // PodNominator abstracts operations to maintain nominated Pods. type PodNominator interface { - // AddNominatedPod adds the given pod to the nominated pod map or + // AddNominatedPod adds the given pod to the nominator or // updates it if it already exists. AddNominatedPod(pod *PodInfo, nodeName string) // DeleteNominatedPodIfExists deletes nominatedPod from internal cache. It's a no-op if it doesn't exist. diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index 18d8801b4b7..14c2919d55e 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -286,7 +286,7 @@ func TestPostFilter(t *testing.T) { frameworkruntime.WithClientSet(cs), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), + frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithExtenders(extenders), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)), ) @@ -960,7 +960,6 @@ func TestDryRunPreemption(t *testing.T) { labelKeys := []string{"hostname", "zone", "region"} for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - rand.Seed(4) nodes := make([]*v1.Node, len(tt.nodeNames)) fakeFilterRCMap := make(map[string]framework.Code, len(tt.nodeNames)) for i, nodeName := range tt.nodeNames { @@ -992,7 +991,13 @@ func TestDryRunPreemption(t *testing.T) { st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), ) registeredPlugins = append(registeredPlugins, tt.registerPlugins...) - objs := []runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}} + var objs []runtime.Object + for _, p := range append(tt.testPods, tt.initPods...) { + objs = append(objs, p) + } + for _, n := range nodes { + objs = append(objs, n) + } informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objs...), 0) parallelism := parallelize.DefaultParallelism if tt.disableParallelism { @@ -1003,7 +1008,7 @@ func TestDryRunPreemption(t *testing.T) { } fwk, err := st.NewFramework( registeredPlugins, "", - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), + frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithParallelism(parallelism), @@ -1030,6 +1035,10 @@ func TestDryRunPreemption(t *testing.T) { } pl := &DefaultPreemption{args: *tt.args} + // Using 4 as a seed source to test getOffsetAndNumCandidates() deterministically. + // However, we need to do it after informerFactory.WaitforCacheSync() which might + // set a seed. + rand.Seed(4) var prevNumFilterCalled int32 for cycle, pod := range tt.testPods { state := framework.NewCycleState() @@ -1222,6 +1231,14 @@ func TestSelectBestCandidate(t *testing.T) { for i, nodeName := range tt.nodeNames { nodes[i] = st.MakeNode().Name(nodeName).Capacity(veryLargeRes).Obj() } + + var objs []runtime.Object + objs = append(objs, tt.pod) + for _, pod := range tt.pods { + objs = append(objs, pod) + } + cs := clientsetfake.NewSimpleClientset(objs...) + informerFactory := informers.NewSharedInformerFactory(cs, 0) snapshot := internalcache.NewSnapshot(tt.pods, nodes) fwk, err := st.NewFramework( []st.RegisterPluginFunc{ @@ -1230,7 +1247,7 @@ func TestSelectBestCandidate(t *testing.T) { st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), + frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithSnapshotSharedLister(snapshot), ) if err != nil { @@ -1635,7 +1652,7 @@ func TestPreempt(t *testing.T) { frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), frameworkruntime.WithExtenders(extenders), - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), + frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)), frameworkruntime.WithInformerFactory(informerFactory), ) diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index 9083a6f1d7c..77b74b85d70 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -1479,7 +1479,7 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) { ) } - podNominator := internalqueue.NewPodNominator() + podNominator := internalqueue.NewPodNominator(nil) if tt.nominatedPod != nil { podNominator.AddNominatedPod(framework.NewPodInfo(tt.nominatedPod), nodeName) } diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 956de18b84c..6e08221df1f 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -247,7 +247,7 @@ func NewPriorityQueue( } if options.podNominator == nil { - options.podNominator = NewPodNominator() + options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister()) } pq := &PriorityQueue{ @@ -628,7 +628,7 @@ func (p *PriorityQueue) Close() { } // DeleteNominatedPodIfExists deletes from nominatedPods. -func (npm *nominatedPodMap) DeleteNominatedPodIfExists(pod *v1.Pod) { +func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) { npm.Lock() npm.delete(pod) npm.Unlock() @@ -638,7 +638,7 @@ func (npm *nominatedPodMap) DeleteNominatedPodIfExists(pod *v1.Pod) { // This is called during the preemption process after a node is nominated to run // the pod. We update the structure before sending a request to update the pod // object to avoid races with the following scheduling cycles. -func (npm *nominatedPodMap) AddNominatedPod(pi *framework.PodInfo, nodeName string) { +func (npm *nominator) AddNominatedPod(pi *framework.PodInfo, nodeName string) { npm.Lock() npm.add(pi, nodeName) npm.Unlock() @@ -646,7 +646,7 @@ func (npm *nominatedPodMap) AddNominatedPod(pi *framework.PodInfo, nodeName stri // NominatedPodsForNode returns pods that are nominated to run on the given node, // but they are waiting for other pods to be removed from the node. -func (npm *nominatedPodMap) NominatedPodsForNode(nodeName string) []*framework.PodInfo { +func (npm *nominator) NominatedPodsForNode(nodeName string) []*framework.PodInfo { npm.RLock() defer npm.RUnlock() // TODO: we may need to return a copy of []*Pods to avoid modification @@ -762,11 +762,11 @@ func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *Unschedulab } } -// nominatedPodMap is a structure that stores pods nominated to run on nodes. +// nominator is a structure that stores pods nominated to run on nodes. // It exists because nominatedNodeName of pod objects stored in the structure // may be different than what scheduler has here. We should be able to find pods // by their UID and update/delete them. -type nominatedPodMap struct { +type nominator struct { // podLister is used to verify if the given pod is alive. podLister listersv1.PodLister // nominatedPods is a map keyed by a node name and the value is a list of @@ -780,7 +780,7 @@ type nominatedPodMap struct { sync.RWMutex } -func (npm *nominatedPodMap) add(pi *framework.PodInfo, nodeName string) { +func (npm *nominator) add(pi *framework.PodInfo, nodeName string) { // always delete the pod if it already exist, to ensure we never store more than // one instance of the pod. npm.delete(pi.Pod) @@ -796,7 +796,7 @@ func (npm *nominatedPodMap) add(pi *framework.PodInfo, nodeName string) { if npm.podLister != nil { // If the pod is not alive, don't contain it. if _, err := npm.podLister.Pods(pi.Pod.Namespace).Get(pi.Pod.Name); err != nil { - klog.V(4).InfoS("Pod doesn't exist in podLister, aborting adding it to the nominated map", "pod", klog.KObj(pi.Pod)) + klog.V(4).InfoS("Pod doesn't exist in podLister, aborting adding it to the nominator", "pod", klog.KObj(pi.Pod)) return } } @@ -804,14 +804,14 @@ func (npm *nominatedPodMap) add(pi *framework.PodInfo, nodeName string) { npm.nominatedPodToNode[pi.Pod.UID] = nnn for _, npi := range npm.nominatedPods[nnn] { if npi.Pod.UID == pi.Pod.UID { - klog.V(4).InfoS("Pod already exists in the nominated map", "pod", klog.KObj(npi.Pod)) + klog.V(4).InfoS("Pod already exists in the nominator", "pod", klog.KObj(npi.Pod)) return } } npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], pi) } -func (npm *nominatedPodMap) delete(p *v1.Pod) { +func (npm *nominator) delete(p *v1.Pod) { nnn, ok := npm.nominatedPodToNode[p.UID] if !ok { return @@ -829,7 +829,7 @@ func (npm *nominatedPodMap) delete(p *v1.Pod) { } // UpdateNominatedPod updates the with . -func (npm *nominatedPodMap) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.PodInfo) { +func (npm *nominator) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.PodInfo) { npm.Lock() defer npm.Unlock() // In some cases, an Update event with no "NominatedNode" present is received right @@ -852,17 +852,11 @@ func (npm *nominatedPodMap) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *frame npm.add(newPodInfo, nodeName) } -// NewPodNominator creates a nominatedPodMap as a backing of framework.PodNominator. -// DEPRECATED: use NewSafePodNominator() instead. -func NewPodNominator() framework.PodNominator { - return NewSafePodNominator(nil) -} - -// NewSafePodNominator creates a nominatedPodMap as a backing of framework.PodNominator. -// Unlike NewPodNominator, it passes in a podLister so as to check if the pod is alive +// NewPodNominator creates a nominator as a backing of framework.PodNominator. +// A podLister is passed in so as to check if the pod exists // before adding its nominatedNode info. -func NewSafePodNominator(podLister listersv1.PodLister) framework.PodNominator { - return &nominatedPodMap{ +func NewPodNominator(podLister listersv1.PodLister) framework.PodNominator { + return &nominator{ podLister: podLister, nominatedPods: make(map[string][]*framework.PodInfo), nominatedPodToNode: make(map[types.UID]string), diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 73d54c03d7f..21a787bd224 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -27,6 +27,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -129,7 +130,8 @@ func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod { } func TestPriorityQueue_Add(t *testing.T) { - q := NewTestQueue(context.Background(), newDefaultQueueSort()) + objs := []runtime.Object{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod} + q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs) if err := q.Add(medPriorityPodInfo.Pod); err != nil { t.Errorf("add failed: %v", err) } @@ -139,7 +141,7 @@ func TestPriorityQueue_Add(t *testing.T) { if err := q.Add(highPriorityPodInfo.Pod); err != nil { t.Errorf("add failed: %v", err) } - expectedNominatedPods := &nominatedPodMap{ + expectedNominatedPods := &nominator{ nominatedPodToNode: map[types.UID]string{ medPriorityPodInfo.Pod.UID: "node1", unschedulablePodInfo.Pod.UID: "node1", @@ -148,7 +150,7 @@ func TestPriorityQueue_Add(t *testing.T) { "node1": {medPriorityPodInfo, unschedulablePodInfo}, }, } - if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { + if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff) } if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod { @@ -160,8 +162,8 @@ func TestPriorityQueue_Add(t *testing.T) { if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } - if len(q.PodNominator.(*nominatedPodMap).nominatedPods["node1"]) != 2 { - t.Errorf("Expected medPriorityPodInfo and unschedulablePodInfo to be still present in nomindatePods: %v", q.PodNominator.(*nominatedPodMap).nominatedPods["node1"]) + if len(q.PodNominator.(*nominator).nominatedPods["node1"]) != 2 { + t.Errorf("Expected medPriorityPodInfo and unschedulablePodInfo to be still present in nomindatePods: %v", q.PodNominator.(*nominator).nominatedPods["node1"]) } } @@ -171,7 +173,8 @@ func newDefaultQueueSort() framework.LessFunc { } func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { - q := NewTestQueue(context.Background(), newDefaultQueueSort()) + objs := []runtime.Object{medPriorityPodInfo.Pod, highPriorityPodInfo.Pod} + q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs) if err := q.Add(medPriorityPodInfo.Pod); err != nil { t.Errorf("add failed: %v", err) } @@ -187,11 +190,12 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { } func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { - q := NewTestQueue(context.Background(), newDefaultQueueSort()) + objs := []runtime.Object{highPriNominatedPodInfo.Pod, unschedulablePodInfo.Pod} + q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs) q.Add(highPriNominatedPodInfo.Pod) q.AddUnschedulableIfNotPresent(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod), q.SchedulingCycle()) // Must not add anything. q.AddUnschedulableIfNotPresent(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod), q.SchedulingCycle()) - expectedNominatedPods := &nominatedPodMap{ + expectedNominatedPods := &nominator{ nominatedPodToNode: map[types.UID]string{ unschedulablePodInfo.Pod.UID: "node1", highPriNominatedPodInfo.Pod.UID: "node1", @@ -200,13 +204,13 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { "node1": {highPriNominatedPodInfo, unschedulablePodInfo}, }, } - if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { + if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff) } if p, err := q.Pop(); err != nil || p.Pod != highPriNominatedPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriNominatedPodInfo.Pod.Name, p.Pod.Name) } - if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 1 { + if len(q.PodNominator.(*nominator).nominatedPods) != 1 { t.Errorf("Expected nomindatePods to have one element: %v", q.PodNominator) } if getUnschedulablePod(q, unschedulablePodInfo.Pod) != unschedulablePodInfo.Pod { @@ -284,7 +288,8 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { } func TestPriorityQueue_Pop(t *testing.T) { - q := NewTestQueue(context.Background(), newDefaultQueueSort()) + objs := []runtime.Object{medPriorityPodInfo.Pod} + q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs) wg := sync.WaitGroup{} wg.Add(1) go func() { @@ -292,8 +297,8 @@ func TestPriorityQueue_Pop(t *testing.T) { if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) } - if len(q.PodNominator.(*nominatedPodMap).nominatedPods["node1"]) != 1 { - t.Errorf("Expected medPriorityPodInfo to be present in nomindatePods: %v", q.PodNominator.(*nominatedPodMap).nominatedPods["node1"]) + if len(q.PodNominator.(*nominator).nominatedPods["node1"]) != 1 { + t.Errorf("Expected medPriorityPodInfo to be present in nomindatePods: %v", q.PodNominator.(*nominator).nominatedPods["node1"]) } }() q.Add(medPriorityPodInfo.Pod) @@ -301,13 +306,14 @@ func TestPriorityQueue_Pop(t *testing.T) { } func TestPriorityQueue_Update(t *testing.T) { + objs := []runtime.Object{highPriorityPodInfo.Pod, unschedulablePodInfo.Pod, medPriorityPodInfo.Pod} c := clock.NewFakeClock(time.Now()) - q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c)) + q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs, WithClock(c)) q.Update(nil, highPriorityPodInfo.Pod) if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriorityPodInfo.Pod)); !exists { t.Errorf("Expected %v to be added to activeQ.", highPriorityPodInfo.Pod.Name) } - if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 0 { + if len(q.PodNominator.(*nominator).nominatedPods) != 0 { t.Errorf("Expected nomindatePods to be empty: %v", q.PodNominator) } // Update highPriorityPodInfo and add a nominatedNodeName to it. @@ -315,7 +321,7 @@ func TestPriorityQueue_Update(t *testing.T) { if q.activeQ.Len() != 1 { t.Error("Expected only one item in activeQ.") } - if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 1 { + if len(q.PodNominator.(*nominator).nominatedPods) != 1 { t.Errorf("Expected one item in nomindatePods map: %v", q.PodNominator) } // Updating an unschedulable pod which is not in any of the two queues, should @@ -382,7 +388,8 @@ func TestPriorityQueue_Update(t *testing.T) { } func TestPriorityQueue_Delete(t *testing.T) { - q := NewTestQueue(context.Background(), newDefaultQueueSort()) + objs := []runtime.Object{highPriorityPodInfo.Pod, unschedulablePodInfo.Pod} + q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs) q.Update(highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod) q.Add(unschedulablePodInfo.Pod) if err := q.Delete(highPriNominatedPodInfo.Pod); err != nil { @@ -394,19 +401,18 @@ func TestPriorityQueue_Delete(t *testing.T) { if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)); exists { t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPodInfo.Pod.Name) } - if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 1 { - t.Errorf("Expected nomindatePods to have only 'unschedulablePodInfo': %v", q.PodNominator.(*nominatedPodMap).nominatedPods) + if len(q.PodNominator.(*nominator).nominatedPods) != 1 { + t.Errorf("Expected nomindatePods to have only 'unschedulablePodInfo': %v", q.PodNominator.(*nominator).nominatedPods) } if err := q.Delete(unschedulablePodInfo.Pod); err != nil { t.Errorf("delete failed: %v", err) } - if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 0 { + if len(q.PodNominator.(*nominator).nominatedPods) != 0 { t.Errorf("Expected nomindatePods to be empty: %v", q.PodNominator) } } func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) { - tests := []struct { name string moveEvent framework.ClusterEvent @@ -630,7 +636,8 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { } func TestPriorityQueue_NominatedPodsForNode(t *testing.T) { - q := NewTestQueue(context.Background(), newDefaultQueueSort()) + objs := []runtime.Object{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod} + q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs) q.Add(medPriorityPodInfo.Pod) q.Add(unschedulablePodInfo.Pod) q.Add(highPriorityPodInfo.Pod) @@ -678,7 +685,7 @@ func TestPriorityQueue_NominatedPodDeleted(t *testing.T) { podLister := informerFactory.Core().V1().Pods().Lister() // Build a PriorityQueue. - q := NewPriorityQueue(newDefaultQueueSort(), informerFactory, WithPodNominator(NewSafePodNominator(podLister))) + q := NewPriorityQueue(newDefaultQueueSort(), informerFactory, WithPodNominator(NewPodNominator(podLister))) ctx := context.Background() informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) @@ -723,7 +730,8 @@ func TestPriorityQueue_PendingPods(t *testing.T) { } func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { - q := NewTestQueue(context.Background(), newDefaultQueueSort()) + objs := []runtime.Object{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod} + q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs) if err := q.Add(medPriorityPodInfo.Pod); err != nil { t.Errorf("add failed: %v", err) } @@ -732,7 +740,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { // Update nominated node name of a pod on a node that is not specified in the pod object. q.AddNominatedPod(framework.NewPodInfo(highPriorityPodInfo.Pod), "node2") - expectedNominatedPods := &nominatedPodMap{ + expectedNominatedPods := &nominator{ nominatedPodToNode: map[types.UID]string{ medPriorityPodInfo.Pod.UID: "node1", highPriorityPodInfo.Pod.UID: "node2", @@ -744,20 +752,20 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { "node5": {unschedulablePodInfo}, }, } - if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { + if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff) } if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) } // List of nominated pods shouldn't change after popping them from the queue. - if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { + if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { t.Errorf("Unexpected diff after popping pods (-want, +got):\n%s", diff) } // Update one of the nominated pods that doesn't have nominatedNodeName in the // pod object. It should be updated correctly. q.AddNominatedPod(highPriorityPodInfo, "node4") - expectedNominatedPods = &nominatedPodMap{ + expectedNominatedPods = &nominator{ nominatedPodToNode: map[types.UID]string{ medPriorityPodInfo.Pod.UID: "node1", highPriorityPodInfo.Pod.UID: "node4", @@ -769,14 +777,14 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { "node5": {unschedulablePodInfo}, }, } - if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { + if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { t.Errorf("Unexpected diff after updating pods (-want, +got):\n%s", diff) } // Delete a nominated pod that doesn't have nominatedNodeName in the pod // object. It should be deleted. q.DeleteNominatedPodIfExists(highPriorityPodInfo.Pod) - expectedNominatedPods = &nominatedPodMap{ + expectedNominatedPods = &nominator{ nominatedPodToNode: map[types.UID]string{ medPriorityPodInfo.Pod.UID: "node1", unschedulablePodInfo.Pod.UID: "node5", @@ -786,7 +794,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { "node5": {unschedulablePodInfo}, }, } - if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { + if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { t.Errorf("Unexpected diff after deleting pods (-want, +got):\n%s", diff) } } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index f961f812ab1..f4f5709526e 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -827,7 +827,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(recorder), frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), + frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), ) algo := core.NewGenericScheduler(