diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 8ce836620c4..374efa16ee8 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -584,6 +584,9 @@ type Framework interface { // PercentageOfNodesToScore returns percentageOfNodesToScore associated to a profile. PercentageOfNodesToScore() *int32 + + // SetPodNominator sets the PodNominator + SetPodNominator(nominator PodNominator) } // Handle provides data and some tools that plugins can use. It is diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 0303a72db5f..e0bfed6cdb4 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -368,6 +368,10 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, stopCh <-cha return f, nil } +func (f *frameworkImpl) SetPodNominator(n framework.PodNominator) { + f.PodNominator = n +} + // getScoreWeights makes sure that, between MultiPoint-Score plugin weights and individual Score // plugin weights there is not an overflow of MaxTotalScore. func getScoreWeights(f *frameworkImpl, pluginsMap map[string]framework.Plugin, plugins []config.Plugin) error { diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 1f639b5c421..bff7584c4db 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -34,7 +34,6 @@ import ( "time" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -143,8 +142,7 @@ func NominatedNodeName(pod *v1.Pod) string { // - unschedulablePods holds pods that were already attempted for scheduling and // are currently determined to be unschedulable. type PriorityQueue struct { - // PodNominator abstracts the operations to maintain nominated Pods. - framework.PodNominator + *nominator stop chan struct{} clock clock.Clock @@ -156,7 +154,6 @@ type PriorityQueue struct { // the maximum time a pod can stay in the unschedulablePods. podMaxInUnschedulablePodsDuration time.Duration - lock sync.RWMutex cond sync.Cond // activeQ is heap structure that scheduler actively looks at to find pods to @@ -192,7 +189,7 @@ type priorityQueueOptions struct { podInitialBackoffDuration time.Duration podMaxBackoffDuration time.Duration podMaxInUnschedulablePodsDuration time.Duration - podNominator framework.PodNominator + podLister listersv1.PodLister clusterEventMap map[framework.ClusterEvent]sets.String preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin } @@ -221,10 +218,10 @@ func WithPodMaxBackoffDuration(duration time.Duration) Option { } } -// WithPodNominator sets pod nominator for PriorityQueue. -func WithPodNominator(pn framework.PodNominator) Option { +// WithPodLister sets pod lister for PriorityQueue. +func WithPodLister(pl listersv1.PodLister) Option { return func(o *priorityQueueOptions) { - o.podNominator = pn + o.podLister = pl } } @@ -276,6 +273,9 @@ func NewPriorityQueue( opts ...Option, ) *PriorityQueue { options := defaultPriorityQueueOptions + if options.podLister == nil { + options.podLister = informerFactory.Core().V1().Pods().Lister() + } for _, opt := range opts { opt(&options) } @@ -286,12 +286,8 @@ func NewPriorityQueue( return lessFn(pInfo1, pInfo2) } - if options.podNominator == nil { - options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister()) - } - pq := &PriorityQueue{ - PodNominator: options.podNominator, + nominator: newPodNominator(options.podLister), clock: options.clock, stop: make(chan struct{}), podInitialBackoffDuration: options.podInitialBackoffDuration, @@ -384,7 +380,7 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error { } klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", PodAdd, "queue", activeQName) metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc() - p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil) + p.addNominatedPodUnlocked(pInfo.PodInfo, nil) p.cond.Broadcast() return nil @@ -438,7 +434,7 @@ func (p *PriorityQueue) activate(pod *v1.Pod) bool { p.unschedulablePods.delete(pInfo.Pod, gated) p.podBackoffQ.Delete(pInfo) metrics.SchedulerQueueIncomingPods.WithLabelValues("active", ForceActivate).Inc() - p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil) + p.addNominatedPodUnlocked(pInfo.PodInfo, nil) return true } @@ -499,7 +495,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodI } - p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil) + p.addNominatedPodUnlocked(pInfo.PodInfo, nil) return nil } @@ -610,14 +606,14 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { // If the pod is already in the active queue, just update it there. if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists { pInfo := updatePod(oldPodInfo, newPod) - p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo) + p.updateNominatedPodUnlocked(oldPod, pInfo.PodInfo) return p.activeQ.Update(pInfo) } // If the pod is in the backoff queue, update it there. if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists { pInfo := updatePod(oldPodInfo, newPod) - p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo) + p.updateNominatedPodUnlocked(oldPod, pInfo.PodInfo) return p.podBackoffQ.Update(pInfo) } } @@ -625,7 +621,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { // If the pod is in the unschedulable queue, updating it may make it schedulable. if usPodInfo := p.unschedulablePods.get(newPod); usPodInfo != nil { pInfo := updatePod(usPodInfo, newPod) - p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo) + p.updateNominatedPodUnlocked(oldPod, pInfo.PodInfo) if isPodUpdated(oldPod, newPod) { gated := usPodInfo.Gated if p.isPodBackingoff(usPodInfo) { @@ -654,7 +650,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { if added, err := p.addToActiveQ(pInfo); !added { return err } - p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil) + p.addNominatedPodUnlocked(pInfo.PodInfo, nil) klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", PodUpdate, "queue", activeQName) p.cond.Broadcast() return nil @@ -665,7 +661,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { func (p *PriorityQueue) Delete(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() - p.PodNominator.DeleteNominatedPodIfExists(pod) + p.deleteNominatedPodIfExistsUnlocked(pod) pInfo := newQueuedPodInfoForLookup(pod) if err := p.activeQ.Delete(pInfo); err != nil { // The item was probably not found in the activeQ. @@ -776,8 +772,7 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework. // any affinity term that matches "pod". // NOTE: this function assumes lock has been acquired in caller. func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.QueuedPodInfo { - var nsLabels labels.Set - nsLabels = interpodaffinity.GetNamespaceLabelsSnapshot(pod.Namespace, p.nsLister) + nsLabels := interpodaffinity.GetNamespaceLabelsSnapshot(pod.Namespace, p.nsLister) var podsToMove []*framework.QueuedPodInfo for _, pInfo := range p.unschedulablePods.podInfoMap { @@ -824,9 +819,13 @@ func (p *PriorityQueue) Close() { // DeleteNominatedPodIfExists deletes from nominatedPods. func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) { - npm.Lock() + npm.lock.Lock() + npm.deleteNominatedPodIfExistsUnlocked(pod) + npm.lock.Unlock() +} + +func (npm *nominator) deleteNominatedPodIfExistsUnlocked(pod *v1.Pod) { npm.delete(pod) - npm.Unlock() } // AddNominatedPod adds a pod to the nominated pods of the given node. @@ -834,16 +833,16 @@ func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) { // the pod. We update the structure before sending a request to update the pod // object to avoid races with the following scheduling cycles. func (npm *nominator) AddNominatedPod(pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) { - npm.Lock() - npm.add(pi, nominatingInfo) - npm.Unlock() + npm.lock.Lock() + npm.addNominatedPodUnlocked(pi, nominatingInfo) + npm.lock.Unlock() } // NominatedPodsForNode returns a copy of pods that are nominated to run on the given node, // but they are waiting for other pods to be removed from the node. func (npm *nominator) NominatedPodsForNode(nodeName string) []*framework.PodInfo { - npm.RLock() - defer npm.RUnlock() + npm.lock.RLock() + defer npm.lock.RUnlock() // Make a copy of the nominated Pods so the caller can mutate safely. pods := make([]*framework.PodInfo, len(npm.nominatedPods[nodeName])) for i := 0; i < len(pods); i++ { @@ -985,10 +984,10 @@ type nominator struct { // nominated. nominatedPodToNode map[types.UID]string - sync.RWMutex + lock sync.RWMutex } -func (npm *nominator) add(pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) { +func (npm *nominator) addNominatedPodUnlocked(pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) { // Always delete the pod if it already exists, to ensure we never store more than // one instance of the pod. npm.delete(pi.Pod) @@ -1045,8 +1044,12 @@ func (npm *nominator) delete(p *v1.Pod) { // UpdateNominatedPod updates the with . func (npm *nominator) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.PodInfo) { - npm.Lock() - defer npm.Unlock() + npm.lock.Lock() + defer npm.lock.Unlock() + npm.updateNominatedPodUnlocked(oldPod, newPodInfo) +} + +func (npm *nominator) updateNominatedPodUnlocked(oldPod *v1.Pod, newPodInfo *framework.PodInfo) { // In some cases, an Update event with no "NominatedNode" present is received right // after a node("NominatedNode") is reserved for this pod in memory. // In this case, we need to keep reserving the NominatedNode when updating the pod pointer. @@ -1067,13 +1070,17 @@ func (npm *nominator) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.P // We update irrespective of the nominatedNodeName changed or not, to ensure // that pod pointer is updated. npm.delete(oldPod) - npm.add(newPodInfo, nominatingInfo) + npm.addNominatedPodUnlocked(newPodInfo, nominatingInfo) } // NewPodNominator creates a nominator as a backing of framework.PodNominator. // A podLister is passed in so as to check if the pod exists // before adding its nominatedNode info. func NewPodNominator(podLister listersv1.PodLister) framework.PodNominator { + return newPodNominator(podLister) +} + +func newPodNominator(podLister listersv1.PodLister) *nominator { return &nominator{ podLister: podLister, nominatedPods: make(map[string][]*framework.PodInfo), diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 5327ac45064..aa02acf7301 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -76,6 +76,11 @@ var ( scheduledPodInfo = mustNewPodInfo( st.MakePod().Name("sp").Namespace("ns1").UID("spns1").Node("foo").Obj(), ) + + nominatorCmpOpts = []cmp.Option{ + cmp.AllowUnexported(nominator{}), + cmpopts.IgnoreFields(nominator{}, "podLister", "lock"), + } ) func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod { @@ -109,7 +114,7 @@ func TestPriorityQueue_Add(t *testing.T) { "node1": {medPriorityPodInfo, unschedulablePodInfo}, }, } - if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { + if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff) } if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod { @@ -121,8 +126,8 @@ func TestPriorityQueue_Add(t *testing.T) { if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } - if len(q.PodNominator.(*nominator).nominatedPods["node1"]) != 2 { - t.Errorf("Expected medPriorityPodInfo and unschedulablePodInfo to be still present in nomindatePods: %v", q.PodNominator.(*nominator).nominatedPods["node1"]) + if len(q.nominator.nominatedPods["node1"]) != 2 { + t.Errorf("Expected medPriorityPodInfo and unschedulablePodInfo to be still present in nomindatePods: %v", q.nominator.nominatedPods["node1"]) } } @@ -167,14 +172,14 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { "node1": {highPriNominatedPodInfo, unschedulablePodInfo}, }, } - if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { + if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff) } if p, err := q.Pop(); err != nil || p.Pod != highPriNominatedPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriNominatedPodInfo.Pod.Name, p.Pod.Name) } - if len(q.PodNominator.(*nominator).nominatedPods) != 1 { - t.Errorf("Expected nomindatePods to have one element: %v", q.PodNominator) + if len(q.nominator.nominatedPods) != 1 { + t.Errorf("Expected nomindatePods to have one element: %v", q.nominator) } if getUnschedulablePod(q, unschedulablePodInfo.Pod) != unschedulablePodInfo.Pod { t.Errorf("Pod %v was not found in the unschedulablePods.", unschedulablePodInfo.Pod.Name) @@ -255,8 +260,8 @@ func TestPriorityQueue_Pop(t *testing.T) { if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) } - if len(q.PodNominator.(*nominator).nominatedPods["node1"]) != 1 { - t.Errorf("Expected medPriorityPodInfo to be present in nomindatePods: %v", q.PodNominator.(*nominator).nominatedPods["node1"]) + if len(q.nominator.nominatedPods["node1"]) != 1 { + t.Errorf("Expected medPriorityPodInfo to be present in nomindatePods: %v", q.nominator.nominatedPods["node1"]) } }() q.Add(medPriorityPodInfo.Pod) @@ -273,16 +278,16 @@ func TestPriorityQueue_Update(t *testing.T) { if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriorityPodInfo.Pod)); !exists { t.Errorf("Expected %v to be added to activeQ.", highPriorityPodInfo.Pod.Name) } - if len(q.PodNominator.(*nominator).nominatedPods) != 0 { - t.Errorf("Expected nomindatePods to be empty: %v", q.PodNominator) + if len(q.nominator.nominatedPods) != 0 { + t.Errorf("Expected nomindatePods to be empty: %v", q.nominator) } // Update highPriorityPodInfo and add a nominatedNodeName to it. q.Update(highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod) if q.activeQ.Len() != 1 { t.Error("Expected only one item in activeQ.") } - if len(q.PodNominator.(*nominator).nominatedPods) != 1 { - t.Errorf("Expected one item in nomindatePods map: %v", q.PodNominator) + if len(q.nominator.nominatedPods) != 1 { + t.Errorf("Expected one item in nomindatePods map: %v", q.nominator) } // Updating an unschedulable pod which is not in any of the two queues, should // add the pod to activeQ. @@ -363,14 +368,14 @@ func TestPriorityQueue_Delete(t *testing.T) { if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)); exists { t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPodInfo.Pod.Name) } - if len(q.PodNominator.(*nominator).nominatedPods) != 1 { - t.Errorf("Expected nomindatePods to have only 'unschedulablePodInfo': %v", q.PodNominator.(*nominator).nominatedPods) + if len(q.nominator.nominatedPods) != 1 { + t.Errorf("Expected nomindatePods to have only 'unschedulablePodInfo': %v", q.nominator.nominatedPods) } if err := q.Delete(unschedulablePodInfo.Pod); err != nil { t.Errorf("delete failed: %v", err) } - if len(q.PodNominator.(*nominator).nominatedPods) != 0 { - t.Errorf("Expected nomindatePods to be empty: %v", q.PodNominator) + if len(q.nominator.nominatedPods) != 0 { + t.Errorf("Expected nomindatePods to be empty: %v", q.nominator) } } @@ -842,7 +847,7 @@ func TestPriorityQueue_NominatedPodDeleted(t *testing.T) { podLister := informerFactory.Core().V1().Pods().Lister() // Build a PriorityQueue. - q := NewPriorityQueue(newDefaultQueueSort(), informerFactory, WithPodNominator(NewPodNominator(podLister))) + q := NewPriorityQueue(newDefaultQueueSort(), informerFactory, WithPodLister(podLister)) ctx, cancel := context.WithCancel(context.Background()) defer cancel() informerFactory.Start(ctx.Done()) @@ -924,14 +929,14 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { "node5": {unschedulablePodInfo}, }, } - if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { + if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff) } if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) } // List of nominated pods shouldn't change after popping them from the queue. - if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { + if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { t.Errorf("Unexpected diff after popping pods (-want, +got):\n%s", diff) } // Update one of the nominated pods that doesn't have nominatedNodeName in the @@ -949,14 +954,14 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { "node5": {unschedulablePodInfo}, }, } - if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { + if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { t.Errorf("Unexpected diff after updating pods (-want, +got):\n%s", diff) } // Attempt to nominate a pod that was deleted from the informer cache. // Nothing should change. q.AddNominatedPod(nonExistentPodInfo, &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "node1"}) - if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { + if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { t.Errorf("Unexpected diff after nominating a deleted pod (-want, +got):\n%s", diff) } // Attempt to nominate a pod that was already scheduled in the informer cache. @@ -964,7 +969,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { scheduledPodCopy := scheduledPodInfo.Pod.DeepCopy() scheduledPodInfo.Pod.Spec.NodeName = "" q.AddNominatedPod(mustNewTestPodInfo(t, scheduledPodCopy), &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "node1"}) - if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { + if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { t.Errorf("Unexpected diff after nominating a scheduled pod (-want, +got):\n%s", diff) } @@ -981,7 +986,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { "node5": {unschedulablePodInfo}, }, } - if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { + if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { t.Errorf("Unexpected diff after deleting pods (-want, +got):\n%s", diff) } } @@ -1197,7 +1202,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { // Add an unschedulable pod to a priority queue. // This makes a situation that the pod was tried to schedule // and had been determined unschedulable so far - unschedulablePod := st.MakePod().Name(fmt.Sprintf("test-pod-unscheduled")).Namespace("ns1").UID("tp001").Priority(highPriority).NominatedNodeName("node1").Obj() + unschedulablePod := st.MakePod().Name("test-pod-unscheduled").Namespace("ns1").UID("tp001").Priority(highPriority).NominatedNodeName("node1").Obj() // Update pod condition to unschedulable. podutil.UpdatePodCondition(&unschedulablePod.Status, &v1.PodCondition{ diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 03bd947a325..ad88ad982ad 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -281,8 +281,6 @@ func New(client clientset.Interface, podLister := informerFactory.Core().V1().Pods().Lister() nodeLister := informerFactory.Core().V1().Nodes().Lister() - // The nominator will be passed all the way to framework instantiation. - nominator := internalqueue.NewPodNominator(podLister) snapshot := internalcache.NewEmptySnapshot() clusterEventMap := make(map[framework.ClusterEvent]sets.String) @@ -292,7 +290,6 @@ func New(client clientset.Interface, frameworkruntime.WithKubeConfig(options.kubeConfig), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithSnapshotSharedLister(snapshot), - frameworkruntime.WithPodNominator(nominator), frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)), frameworkruntime.WithClusterEventMap(clusterEventMap), frameworkruntime.WithParallelism(int(options.parallelism)), @@ -315,12 +312,16 @@ func New(client clientset.Interface, informerFactory, internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second), internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second), - internalqueue.WithPodNominator(nominator), + internalqueue.WithPodLister(podLister), internalqueue.WithClusterEventMap(clusterEventMap), internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration), internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap), ) + for _, fwk := range profiles { + fwk.SetPodNominator(podQueue) + } + schedulerCache := internalcache.New(durationToExpireAssumedPod, stopEverything) // Setup cache debugger.