diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index dd6457230b6..d7232644ace 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -979,7 +979,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) { if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil { t.Fatal(err) } - scheduler.schedulingQueue.UpdateNominatedPodForNode(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}, "1") + scheduler.schedulingQueue.AddNominatedPod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}, "1") _, _, err = scheduler.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), test.pod) diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index bc5ad6754da..51f985e8ca7 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -99,18 +99,21 @@ type Configurator struct { frameworkCapturer FrameworkCapturer } -func (c *Configurator) buildFramework(p schedulerapi.KubeSchedulerProfile) (framework.Framework, error) { +func (c *Configurator) buildFramework(p schedulerapi.KubeSchedulerProfile, opts ...framework.Option) (framework.Framework, error) { if c.frameworkCapturer != nil { c.frameworkCapturer(p) } - return framework.NewFramework( - c.registry, - p.Plugins, - p.PluginConfig, + opts = append([]framework.Option{ framework.WithClientSet(c.client), framework.WithInformerFactory(c.informerFactory), framework.WithSnapshotSharedLister(c.nodeInfoSnapshot), framework.WithRunAllFilters(c.alwaysCheckAllPredicates), + }, opts...) + return framework.NewFramework( + c.registry, + p.Plugins, + p.PluginConfig, + opts..., ) } @@ -159,7 +162,10 @@ func (c *Configurator) create() (*Scheduler, error) { } } - profiles, err := profile.NewMap(c.profiles, c.buildFramework, c.recorderFactory) + // The nominator will be passed all the way to framework instantiation. + nominator := internalqueue.NewPodNominator() + profiles, err := profile.NewMap(c.profiles, c.buildFramework, c.recorderFactory, + framework.WithPodNominator(nominator)) if err != nil { return nil, fmt.Errorf("initializing profiles: %v", err) } @@ -172,6 +178,7 @@ func (c *Configurator) create() (*Scheduler, error) { lessFn, internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second), internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second), + internalqueue.WithPodNominator(nominator), ) // Setup cache debugger. diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index 65a82940b57..4827502dd36 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -83,6 +83,8 @@ type framework struct { metricsRecorder *metricsRecorder + preemptHandle PreemptHandle + // Indicates that RunFilterPlugins should accumulate all failed statuses and not return // after the first failure. runAllFilters bool @@ -121,6 +123,7 @@ type frameworkOptions struct { snapshotSharedLister SharedLister metricsRecorder *metricsRecorder volumeBinder scheduling.SchedulerVolumeBinder + podNominator PodNominator runAllFilters bool } @@ -170,6 +173,13 @@ func WithVolumeBinder(binder scheduling.SchedulerVolumeBinder) Option { } } +// WithPodNominator sets podNominator for the scheduling framework. +func WithPodNominator(nominator PodNominator) Option { + return func(o *frameworkOptions) { + o.podNominator = nominator + } +} + var defaultFrameworkOptions = frameworkOptions{ metricsRecorder: newMetricsRecorder(1000, time.Second), } @@ -192,6 +202,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi informerFactory: options.informerFactory, volumeBinder: options.volumeBinder, metricsRecorder: options.metricsRecorder, + preemptHandle: options.podNominator, runAllFilters: options.runAllFilters, } if plugins == nil { diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index 2c23009fb60..73791d280c2 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -495,3 +495,19 @@ type FrameworkHandle interface { // VolumeBinder returns the volume binder used by scheduler. VolumeBinder() scheduling.SchedulerVolumeBinder } + +// PreemptHandle incorporates all needed logic to run preemption logic. +type PreemptHandle interface { + PodNominator +} + +// PodNominator abstracts operations to maintain nominated Pods. +type PodNominator interface { + // AddNominatedPod adds the given pod to the nominated pod map or + // updates it if it already exists. + AddNominatedPod(pod *v1.Pod, nodeName string) + // DeleteNominatedPodIfExists deletes nominatedPod from internal cache. It's a no-op if it doesn't exist. + DeleteNominatedPodIfExists(pod *v1.Pod) + // UpdateNominatedPod updates the with . + UpdateNominatedPod(oldPod, newPod *v1.Pod) +} diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 3b3b2d28944..27ac52d15ce 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -65,6 +65,7 @@ const ( // The interface follows a pattern similar to cache.FIFO and cache.Heap and // makes it easy to use those data structures as a SchedulingQueue. type SchedulingQueue interface { + framework.PodNominator Add(pod *v1.Pod) error // AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue. // The podSchedulingCycle represents the current scheduling cycle number which can be @@ -87,11 +88,6 @@ type SchedulingQueue interface { // Close closes the SchedulingQueue so that the goroutine which is // waiting to pop items can exit gracefully. Close() - // UpdateNominatedPodForNode adds the given pod to the nominated pod map or - // updates it if it already exists. - UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) - // DeleteNominatedPodIfExists deletes nominatedPod from internal cache - DeleteNominatedPodIfExists(pod *v1.Pod) // NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue. NumUnschedulablePods() int // Run starts the goroutines managing the queue. @@ -116,6 +112,9 @@ func NominatedNodeName(pod *v1.Pod) string { // is called unschedulableQ. The third queue holds pods that are moved from // unschedulable queues and will be moved to active queue when backoff are completed. type PriorityQueue struct { + // PodNominator abstracts the operations to maintain nominated Pods. + framework.PodNominator + stop chan struct{} clock util.Clock @@ -135,9 +134,6 @@ type PriorityQueue struct { podBackoffQ *heap.Heap // unschedulableQ holds pods that have been tried and determined unschedulable. unschedulableQ *UnschedulablePodsMap - // nominatedPods is a structures that stores pods which are nominated to run - // on nodes. - nominatedPods *nominatedPodMap // schedulingCycle represents sequence number of scheduling cycle and is incremented // when a pod is popped. schedulingCycle int64 @@ -156,6 +152,7 @@ type priorityQueueOptions struct { clock util.Clock podInitialBackoffDuration time.Duration podMaxBackoffDuration time.Duration + podNominator framework.PodNominator } // Option configures a PriorityQueue @@ -168,20 +165,27 @@ func WithClock(clock util.Clock) Option { } } -// WithPodInitialBackoffDuration sets pod initial backoff duration for PriorityQueue, +// WithPodInitialBackoffDuration sets pod initial backoff duration for PriorityQueue. func WithPodInitialBackoffDuration(duration time.Duration) Option { return func(o *priorityQueueOptions) { o.podInitialBackoffDuration = duration } } -// WithPodMaxBackoffDuration sets pod max backoff duration for PriorityQueue, +// WithPodMaxBackoffDuration sets pod max backoff duration for PriorityQueue. func WithPodMaxBackoffDuration(duration time.Duration) Option { return func(o *priorityQueueOptions) { o.podMaxBackoffDuration = duration } } +// WithPodNominator sets pod nominator for PriorityQueue. +func WithPodNominator(pn framework.PodNominator) Option { + return func(o *priorityQueueOptions) { + o.podNominator = pn + } +} + var defaultPriorityQueueOptions = priorityQueueOptions{ clock: util.RealClock{}, podInitialBackoffDuration: DefaultPodInitialBackoffDuration, @@ -214,14 +218,18 @@ func NewPriorityQueue( return lessFn(pInfo1, pInfo2) } + if options.podNominator == nil { + options.podNominator = NewPodNominator() + } + pq := &PriorityQueue{ + PodNominator: options.podNominator, clock: options.clock, stop: make(chan struct{}), podInitialBackoffDuration: options.podInitialBackoffDuration, podMaxBackoffDuration: options.podMaxBackoffDuration, activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()), unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()), - nominatedPods: newNominatedPodMap(), moveRequestCycle: -1, } pq.cond.L = &pq.lock @@ -255,7 +263,7 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error { klog.Errorf("Error: pod %v is already in the podBackoff queue.", nsNameForPod(pod)) } metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc() - p.nominatedPods.add(pod, "") + p.PodNominator.AddNominatedPod(pod, "") p.cond.Broadcast() return nil @@ -316,9 +324,8 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodI metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc() } - p.nominatedPods.add(pod, "") + p.PodNominator.AddNominatedPod(pod, "") return nil - } // flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ @@ -416,14 +423,14 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { oldPodInfo := newQueuedPodInfoNoTimestamp(oldPod) // If the pod is already in the active queue, just update it there. if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists { - p.nominatedPods.update(oldPod, newPod) + p.PodNominator.UpdateNominatedPod(oldPod, newPod) err := p.activeQ.Update(updatePod(oldPodInfo, newPod)) return err } // If the pod is in the backoff queue, update it there. if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists { - p.nominatedPods.update(oldPod, newPod) + p.PodNominator.UpdateNominatedPod(oldPod, newPod) p.podBackoffQ.Delete(oldPodInfo) err := p.activeQ.Add(updatePod(oldPodInfo, newPod)) if err == nil { @@ -435,7 +442,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { // If the pod is in the unschedulable queue, updating it may make it schedulable. if usPodInfo := p.unschedulableQ.get(newPod); usPodInfo != nil { - p.nominatedPods.update(oldPod, newPod) + p.PodNominator.UpdateNominatedPod(oldPod, newPod) if isPodUpdated(oldPod, newPod) { p.unschedulableQ.delete(usPodInfo.Pod) err := p.activeQ.Add(updatePod(usPodInfo, newPod)) @@ -451,7 +458,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { // If pod is not in any of the queues, we put it in the active queue. err := p.activeQ.Add(p.newQueuedPodInfo(newPod)) if err == nil { - p.nominatedPods.add(newPod, "") + p.PodNominator.AddNominatedPod(newPod, "") p.cond.Broadcast() } return err @@ -462,7 +469,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { func (p *PriorityQueue) Delete(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() - p.nominatedPods.delete(pod) + p.PodNominator.DeleteNominatedPodIfExists(pod) err := p.activeQ.Delete(newQueuedPodInfoNoTimestamp(pod)) if err != nil { // The item was probably not found in the activeQ. p.podBackoffQ.Delete(newQueuedPodInfoNoTimestamp(pod)) @@ -553,9 +560,8 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod // but they are waiting for other pods to be removed from the node before they // can be actually scheduled. func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*v1.Pod { - p.lock.RLock() - defer p.lock.RUnlock() - return p.nominatedPods.podsForNode(nodeName) + // TODO: make podsForNode() public? + return p.PodNominator.(*nominatedPodMap).podsForNode(nodeName) } // PendingPods returns all the pending pods in the queue. This function is @@ -585,21 +591,21 @@ func (p *PriorityQueue) Close() { p.cond.Broadcast() } -// DeleteNominatedPodIfExists deletes pod nominatedPods. -func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) { - p.lock.Lock() - p.nominatedPods.delete(pod) - p.lock.Unlock() +// DeleteNominatedPodIfExists deletes from nominatedPods. +func (npm *nominatedPodMap) DeleteNominatedPodIfExists(pod *v1.Pod) { + npm.Lock() + npm.delete(pod) + npm.Unlock() } -// UpdateNominatedPodForNode adds a pod to the nominated pods of the given node. +// AddNominatedPod adds a pod to the nominated pods of the given node. // This is called during the preemption process after a node is nominated to run // the pod. We update the structure before sending a request to update the pod // object to avoid races with the following scheduling cycles. -func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) { - p.lock.Lock() - p.nominatedPods.add(pod, nodeName) - p.lock.Unlock() +func (npm *nominatedPodMap) AddNominatedPod(pod *v1.Pod, nodeName string) { + npm.Lock() + npm.add(pod, nodeName) + npm.Unlock() } func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool { @@ -721,6 +727,8 @@ type nominatedPodMap struct { // nominatedPodToNode is map keyed by a Pod UID to the node name where it is // nominated. nominatedPodToNode map[ktypes.UID]string + + sync.RWMutex } func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) { @@ -762,7 +770,10 @@ func (npm *nominatedPodMap) delete(p *v1.Pod) { delete(npm.nominatedPodToNode, p.UID) } -func (npm *nominatedPodMap) update(oldPod, newPod *v1.Pod) { +// UpdateNominatedPod updates the with . +func (npm *nominatedPodMap) UpdateNominatedPod(oldPod, newPod *v1.Pod) { + npm.Lock() + defer npm.Unlock() // In some cases, an Update event with no "NominatedNode" present is received right // after a node("NominatedNode") is reserved for this pod in memory. // In this case, we need to keep reserving the NominatedNode when updating the pod pointer. @@ -784,13 +795,16 @@ func (npm *nominatedPodMap) update(oldPod, newPod *v1.Pod) { } func (npm *nominatedPodMap) podsForNode(nodeName string) []*v1.Pod { + npm.RLock() + defer npm.RUnlock() if list, ok := npm.nominatedPods[nodeName]; ok { return list } return nil } -func newNominatedPodMap() *nominatedPodMap { +// NewPodNominator creates a nominatedPodMap as a backing of framework.PodNominator. +func NewPodNominator() framework.PodNominator { return &nominatedPodMap{ nominatedPods: make(map[string][]*v1.Pod), nominatedPodToNode: make(map[ktypes.UID]string), diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index ccb299c933b..55149dc596e 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -134,8 +134,8 @@ func TestPriorityQueue_Add(t *testing.T) { "node1": {&medPriorityPod, &unschedulablePod}, }, } - if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) { - t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods) + if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { + t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.PodNominator) } if p, err := q.Pop(); err != nil || p.Pod != &highPriorityPod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name) @@ -146,8 +146,8 @@ func TestPriorityQueue_Add(t *testing.T) { if p, err := q.Pop(); err != nil || p.Pod != &unschedulablePod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Pod.Name) } - if len(q.nominatedPods.nominatedPods["node1"]) != 2 { - t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods.nominatedPods["node1"]) + if len(q.PodNominator.(*nominatedPodMap).nominatedPods["node1"]) != 2 { + t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.PodNominator.(*nominatedPodMap).nominatedPods["node1"]) } } @@ -186,14 +186,14 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { "node1": {&highPriNominatedPod, &unschedulablePod}, }, } - if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) { - t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods) + if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { + t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.PodNominator) } if p, err := q.Pop(); err != nil || p.Pod != &highPriNominatedPod { t.Errorf("Expected: %v after Pop, but got: %v", highPriNominatedPod.Name, p.Pod.Name) } - if len(q.nominatedPods.nominatedPods) != 1 { - t.Errorf("Expected nomindatePods to have one element: %v", q.nominatedPods) + if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 1 { + t.Errorf("Expected nomindatePods to have one element: %v", q.PodNominator) } if getUnschedulablePod(q, &unschedulablePod) != &unschedulablePod { t.Errorf("Pod %v was not found in the unschedulableQ.", unschedulablePod.Name) @@ -278,8 +278,8 @@ func TestPriorityQueue_Pop(t *testing.T) { if p, err := q.Pop(); err != nil || p.Pod != &medPriorityPod { t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name) } - if len(q.nominatedPods.nominatedPods["node1"]) != 1 { - t.Errorf("Expected medPriorityPod to be present in nomindatePods: %v", q.nominatedPods.nominatedPods["node1"]) + if len(q.PodNominator.(*nominatedPodMap).nominatedPods["node1"]) != 1 { + t.Errorf("Expected medPriorityPod to be present in nomindatePods: %v", q.PodNominator.(*nominatedPodMap).nominatedPods["node1"]) } }() q.Add(&medPriorityPod) @@ -292,16 +292,16 @@ func TestPriorityQueue_Update(t *testing.T) { if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&highPriorityPod)); !exists { t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name) } - if len(q.nominatedPods.nominatedPods) != 0 { - t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods) + if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 0 { + t.Errorf("Expected nomindatePods to be empty: %v", q.PodNominator) } // Update highPriorityPod and add a nominatedNodeName to it. q.Update(&highPriorityPod, &highPriNominatedPod) if q.activeQ.Len() != 1 { t.Error("Expected only one item in activeQ.") } - if len(q.nominatedPods.nominatedPods) != 1 { - t.Errorf("Expected one item in nomindatePods map: %v", q.nominatedPods) + if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 1 { + t.Errorf("Expected one item in nomindatePods map: %v", q.PodNominator) } // Updating an unschedulable pod which is not in any of the two queues, should // add the pod to activeQ. @@ -347,14 +347,14 @@ func TestPriorityQueue_Delete(t *testing.T) { if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&highPriNominatedPod)); exists { t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPod.Name) } - if len(q.nominatedPods.nominatedPods) != 1 { - t.Errorf("Expected nomindatePods to have only 'unschedulablePod': %v", q.nominatedPods.nominatedPods) + if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 1 { + t.Errorf("Expected nomindatePods to have only 'unschedulablePod': %v", q.PodNominator.(*nominatedPodMap).nominatedPods) } if err := q.Delete(&unschedulablePod); err != nil { t.Errorf("delete failed: %v", err) } - if len(q.nominatedPods.nominatedPods) != 0 { - t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods) + if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 0 { + t.Errorf("Expected nomindatePods to be empty: %v", q.PodNominator) } } @@ -480,10 +480,10 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { t.Errorf("add failed: %v", err) } // Update unschedulablePod on a different node than specified in the pod. - q.UpdateNominatedPodForNode(&unschedulablePod, "node5") + q.AddNominatedPod(&unschedulablePod, "node5") // Update nominated node name of a pod on a node that is not specified in the pod object. - q.UpdateNominatedPodForNode(&highPriorityPod, "node2") + q.AddNominatedPod(&highPriorityPod, "node2") expectedNominatedPods := &nominatedPodMap{ nominatedPodToNode: map[types.UID]string{ medPriorityPod.UID: "node1", @@ -496,19 +496,19 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { "node5": {&unschedulablePod}, }, } - if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) { - t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods) + if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { + t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.PodNominator) } if p, err := q.Pop(); err != nil || p.Pod != &medPriorityPod { t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name) } // List of nominated pods shouldn't change after popping them from the queue. - if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) { - t.Errorf("Unexpected nominated map after popping pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods) + if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { + t.Errorf("Unexpected nominated map after popping pods. Expected: %v, got: %v", expectedNominatedPods, q.PodNominator) } // Update one of the nominated pods that doesn't have nominatedNodeName in the // pod object. It should be updated correctly. - q.UpdateNominatedPodForNode(&highPriorityPod, "node4") + q.AddNominatedPod(&highPriorityPod, "node4") expectedNominatedPods = &nominatedPodMap{ nominatedPodToNode: map[types.UID]string{ medPriorityPod.UID: "node1", @@ -521,8 +521,8 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { "node5": {&unschedulablePod}, }, } - if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) { - t.Errorf("Unexpected nominated map after updating pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods) + if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { + t.Errorf("Unexpected nominated map after updating pods. Expected: %v, got: %v", expectedNominatedPods, q.PodNominator) } // Delete a nominated pod that doesn't have nominatedNodeName in the pod @@ -538,8 +538,8 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { "node5": {&unschedulablePod}, }, } - if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) { - t.Errorf("Unexpected nominated map after deleting pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods) + if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { + t.Errorf("Unexpected nominated map after deleting pods. Expected: %v, got: %v", expectedNominatedPods, q.PodNominator) } } diff --git a/pkg/scheduler/profile/profile.go b/pkg/scheduler/profile/profile.go index 9555b287302..3e6f3d14e23 100644 --- a/pkg/scheduler/profile/profile.go +++ b/pkg/scheduler/profile/profile.go @@ -33,7 +33,7 @@ import ( type RecorderFactory func(string) events.EventRecorder // FrameworkFactory builds a Framework for a given profile configuration. -type FrameworkFactory func(config.KubeSchedulerProfile) (framework.Framework, error) +type FrameworkFactory func(config.KubeSchedulerProfile, ...framework.Option) (framework.Framework, error) // Profile is a scheduling profile. type Profile struct { @@ -42,8 +42,9 @@ type Profile struct { } // NewProfile builds a Profile for the given configuration. -func NewProfile(cfg config.KubeSchedulerProfile, frameworkFact FrameworkFactory, recorderFact RecorderFactory) (*Profile, error) { - f, err := frameworkFact(cfg) +func NewProfile(cfg config.KubeSchedulerProfile, frameworkFact FrameworkFactory, recorderFact RecorderFactory, + opts ...framework.Option) (*Profile, error) { + f, err := frameworkFact(cfg, opts...) if err != nil { return nil, err } @@ -58,7 +59,8 @@ func NewProfile(cfg config.KubeSchedulerProfile, frameworkFact FrameworkFactory, type Map map[string]*Profile // NewMap builds the profiles given by the configuration, indexed by name. -func NewMap(cfgs []config.KubeSchedulerProfile, frameworkFact FrameworkFactory, recorderFact RecorderFactory) (Map, error) { +func NewMap(cfgs []config.KubeSchedulerProfile, frameworkFact FrameworkFactory, recorderFact RecorderFactory, + opts ...framework.Option) (Map, error) { m := make(Map) v := cfgValidator{m: m} @@ -66,7 +68,7 @@ func NewMap(cfgs []config.KubeSchedulerProfile, frameworkFact FrameworkFactory, if err := v.validate(cfg); err != nil { return nil, err } - p, err := NewProfile(cfg, frameworkFact, recorderFact) + p, err := NewProfile(cfg, frameworkFact, recorderFact, opts...) if err != nil { return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err) } diff --git a/pkg/scheduler/profile/profile_test.go b/pkg/scheduler/profile/profile_test.go index 1bcbd5b9e16..58ca53b4135 100644 --- a/pkg/scheduler/profile/profile_test.go +++ b/pkg/scheduler/profile/profile_test.go @@ -308,8 +308,8 @@ func newFakePlugin(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plu return &fakePlugin{}, nil } -func fakeFrameworkFactory(cfg config.KubeSchedulerProfile) (framework.Framework, error) { - return framework.NewFramework(fakeRegistry, cfg.Plugins, cfg.PluginConfig) +func fakeFrameworkFactory(cfg config.KubeSchedulerProfile, opts ...framework.Option) (framework.Framework, error) { + return framework.NewFramework(fakeRegistry, cfg.Plugins, cfg.PluginConfig, opts...) } func nilRecorderFactory(_ string) events.EventRecorder { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index f4cc66d09fa..ab487f1bb2d 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -402,7 +402,7 @@ func (sched *Scheduler) preempt(ctx context.Context, prof *profile.Profile, stat // Update the scheduling queue with the nominated pod information. Without // this, there would be a race condition between the next scheduling cycle // and the time the scheduler receives a Pod Update for the nominated pod. - sched.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName) + sched.SchedulingQueue.AddNominatedPod(preemptor, nodeName) // Make a call to update nominated node name of the pod on the API server. err = sched.podPreemptor.setNominatedNodeName(preemptor, nodeName)