Merge pull request #125820 from macsko/add_separate_lock_for_pod_nominator_scheduling_queue

Add a separate lock for pod nominator in scheduling queue
This commit is contained in:
Kubernetes Prow Robot 2024-07-17 12:06:10 -07:00 committed by GitHub
commit d879103c28
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 162 additions and 85 deletions

View File

@ -337,7 +337,7 @@ func TestSchedulerWithExtenders(t *testing.T) {
test.registerPlugins, "",
runtime.WithClientSet(client),
runtime.WithInformerFactory(informerFactory),
runtime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
runtime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
runtime.WithLogger(logger),
)
if err != nil {

View File

@ -365,7 +365,7 @@ func TestPostFilter(t *testing.T) {
frameworkruntime.WithClientSet(cs),
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)),
frameworkruntime.WithLogger(logger),
@ -1102,7 +1102,7 @@ func TestDryRunPreemption(t *testing.T) {
fwk, err := tf.NewFramework(
ctx,
registeredPlugins, "",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithParallelism(parallelism),
@ -1361,7 +1361,7 @@ func TestSelectBestCandidate(t *testing.T) {
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
"",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithLogger(logger),
)
@ -1746,7 +1746,7 @@ func TestPreempt(t *testing.T) {
frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithWaitingPods(waitingPods),

View File

@ -341,7 +341,7 @@ func TestDryRunPreemption(t *testing.T) {
fwk, err := tf.NewFramework(
ctx,
registeredPlugins, "",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithParallelism(parallelism),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, tt.nodes)),
@ -446,7 +446,7 @@ func TestSelectCandidate(t *testing.T) {
ctx,
registeredPlugins,
"",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithLogger(logger),
)

View File

@ -2385,7 +2385,7 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) {
)
}
podNominator := internalqueue.NewPodNominator(nil)
podNominator := internalqueue.NewTestPodNominator(nil)
if tt.nominatedPod != nil {
podNominator.AddNominatedPod(
logger,

View File

@ -32,10 +32,12 @@ import (
"fmt"
"math/rand"
"reflect"
"slices"
"sync"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
@ -156,6 +158,11 @@ type PriorityQueue struct {
stop chan struct{}
clock clock.Clock
// lock takes precedence and should be taken first,
// before any other locks in the queue (activeQLock or nominator.nLock).
// Correct locking order is: lock > activeQLock > nominator.nLock.
lock sync.RWMutex
// pod initial backoff duration.
podInitialBackoffDuration time.Duration
// pod maximum backoff duration.
@ -169,9 +176,10 @@ type PriorityQueue struct {
// activeQLock synchronizes all operations related to activeQ.
// It protects activeQ, inFlightPods, inFlightEvents, schedulingCycle and closed fields.
// Caution: DO NOT take nominator.lock after taking activeQLock,
// you should take nominator.lock first if you need two locks,
// otherwise the queue could end up deadlock.
// Caution: DO NOT take "lock" after taking "activeQLock".
// You should always take "lock" first, otherwise the queue could end up in deadlock.
// "activeQLock" should not be taken after taking "nLock".
// Correct locking order is: lock > activeQLock > nominator.nLock.
activeQLock sync.RWMutex
// inFlightPods holds the UID of all pods which have been popped out for which Done
@ -381,7 +389,6 @@ func NewPriorityQueue(
}
pq := &PriorityQueue{
nominator: newPodNominator(options.podLister),
clock: options.clock,
stop: make(chan struct{}),
podInitialBackoffDuration: options.podInitialBackoffDuration,
@ -401,6 +408,7 @@ func NewPriorityQueue(
pq.cond.L = &pq.activeQLock
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()
pq.nominator = newPodNominator(options.podLister, pq.nominatedPodsToInfo)
return pq
}
@ -606,7 +614,7 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
if event == PodAdd || event == PodUpdate {
p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil)
p.AddNominatedPod(logger, pInfo.PodInfo, nil)
}
return true, nil
@ -807,7 +815,7 @@ func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger,
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
}
p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil)
p.AddNominatedPod(logger, pInfo.PodInfo, nil)
return nil
}
@ -860,7 +868,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *
p.cond.Broadcast()
}
p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil)
p.AddNominatedPod(logger, pInfo.PodInfo, nil)
return nil
}
@ -1022,7 +1030,7 @@ func (p *PriorityQueue) updateInActiveQueue(logger klog.Logger, oldPod, newPod *
defer p.activeQLock.Unlock()
if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
pInfo := updatePod(oldPodInfo, newPod)
p.updateNominatedPodUnlocked(logger, oldPod, pInfo.PodInfo)
p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo)
return true, p.activeQ.Update(pInfo)
}
return false, nil
@ -1068,7 +1076,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
// If the pod is in the backoff queue, update it there.
if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists {
pInfo := updatePod(oldPodInfo, newPod)
p.updateNominatedPodUnlocked(logger, oldPod, pInfo.PodInfo)
p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo)
return p.podBackoffQ.Update(pInfo)
}
}
@ -1076,7 +1084,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
// If the pod is in the unschedulable queue, updating it may make it schedulable.
if usPodInfo := p.unschedulablePods.get(newPod); usPodInfo != nil {
pInfo := updatePod(usPodInfo, newPod)
p.updateNominatedPodUnlocked(logger, oldPod, pInfo.PodInfo)
p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo)
gated := usPodInfo.Gated
if p.isSchedulingQueueHintEnabled {
// When unscheduled Pods are updated, we check with QueueingHint
@ -1130,7 +1138,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
func (p *PriorityQueue) Delete(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
p.deleteNominatedPodIfExistsUnlocked(pod)
p.DeleteNominatedPodIfExists(pod)
pInfo := newQueuedPodInfoForLookup(pod)
p.activeQLock.Lock()
defer p.activeQLock.Unlock()
@ -1378,6 +1386,43 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) {
return result, fmt.Sprintf(pendingPodsSummary, activeQLen, p.podBackoffQ.Len(), len(p.unschedulablePods.podInfoMap))
}
// Note: this function assumes the caller locks p.lock.RLock.
func (p *PriorityQueue) nominatedPodToInfo(np PodRef) *framework.PodInfo {
pod := np.ToPod()
pInfoLookup := newQueuedPodInfoForLookup(pod)
obj, exists, _ := p.activeQ.Get(pInfoLookup)
if exists {
queuedPodInfo := obj.(*framework.QueuedPodInfo)
return queuedPodInfo.PodInfo
}
queuedPodInfo := p.unschedulablePods.get(pod)
if queuedPodInfo != nil {
return queuedPodInfo.PodInfo
}
obj, exists, _ = p.podBackoffQ.Get(pInfoLookup)
if exists {
queuedPodInfo := obj.(*framework.QueuedPodInfo)
return queuedPodInfo.PodInfo
}
return &framework.PodInfo{Pod: pod}
}
func (p *PriorityQueue) nominatedPodsToInfo(nominatedPods []PodRef) []*framework.PodInfo {
p.lock.RLock()
defer p.lock.RUnlock()
p.activeQLock.RLock()
defer p.activeQLock.RUnlock()
pods := make([]*framework.PodInfo, len(nominatedPods))
for i, np := range nominatedPods {
pods[i] = p.nominatedPodToInfo(np).DeepCopy()
}
return pods
}
// Close closes the priority queue.
func (p *PriorityQueue) Close() {
p.lock.Lock()
@ -1392,13 +1437,9 @@ func (p *PriorityQueue) Close() {
// DeleteNominatedPodIfExists deletes <pod> from nominatedPods.
func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) {
npm.lock.Lock()
npm.deleteNominatedPodIfExistsUnlocked(pod)
npm.lock.Unlock()
}
func (npm *nominator) deleteNominatedPodIfExistsUnlocked(pod *v1.Pod) {
npm.nLock.Lock()
npm.delete(pod)
npm.nLock.Unlock()
}
// AddNominatedPod adds a pod to the nominated pods of the given node.
@ -1406,22 +1447,20 @@ func (npm *nominator) deleteNominatedPodIfExistsUnlocked(pod *v1.Pod) {
// the pod. We update the structure before sending a request to update the pod
// object to avoid races with the following scheduling cycles.
func (npm *nominator) AddNominatedPod(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
npm.lock.Lock()
npm.nLock.Lock()
npm.addNominatedPodUnlocked(logger, pi, nominatingInfo)
npm.lock.Unlock()
npm.nLock.Unlock()
}
// NominatedPodsForNode returns a copy of pods that are nominated to run on the given node,
// but they are waiting for other pods to be removed from the node.
// CAUTION: Make sure you don't call this function while taking any lock in any scenario.
func (npm *nominator) NominatedPodsForNode(nodeName string) []*framework.PodInfo {
npm.lock.RLock()
defer npm.lock.RUnlock()
// Make a copy of the nominated Pods so the caller can mutate safely.
pods := make([]*framework.PodInfo, len(npm.nominatedPods[nodeName]))
for i := 0; i < len(pods); i++ {
pods[i] = npm.nominatedPods[nodeName][i].DeepCopy()
}
return pods
npm.nLock.RLock()
nominatedPods := slices.Clone(npm.nominatedPods[nodeName])
npm.nLock.RUnlock()
// Note that nominatedPodsToInfo takes SchedulingQueue.lock inside.
return npm.nominatedPodsToInfo(nominatedPods)
}
func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
@ -1542,22 +1581,55 @@ func newUnschedulablePods(unschedulableRecorder, gatedRecorder metrics.MetricRec
}
}
type PodRef struct {
Name string
Namespace string
UID types.UID
}
func PodToRef(pod *v1.Pod) PodRef {
return PodRef{
Name: pod.Name,
Namespace: pod.Namespace,
UID: pod.UID,
}
}
func (np PodRef) ToPod() *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: np.Name,
Namespace: np.Namespace,
UID: np.UID,
},
}
}
// nominator is a structure that stores pods nominated to run on nodes.
// It exists because nominatedNodeName of pod objects stored in the structure
// may be different than what scheduler has here. We should be able to find pods
// by their UID and update/delete them.
type nominator struct {
// nLock synchronizes all operations related to nominator.
// Caution: DO NOT take ("SchedulingQueue.lock" or "SchedulingQueue.activeQLock") after taking "nLock".
// You should always take "SchedulingQueue.lock" and "SchedulingQueue.activeQLock" first,
// otherwise the nominator could end up in deadlock.
// Correct locking order is: SchedulingQueue.lock > SchedulingQueue.activeQLock > nLock.
nLock sync.RWMutex
// podLister is used to verify if the given pod is alive.
podLister listersv1.PodLister
// nominatedPods is a map keyed by a node name and the value is a list of
// pods which are nominated to run on the node. These are pods which can be in
// the activeQ or unschedulablePods.
nominatedPods map[string][]*framework.PodInfo
nominatedPods map[string][]PodRef
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is
// nominated.
nominatedPodToNode map[types.UID]string
lock sync.RWMutex
// nominatedPodsToInfo returns PodInfos cached in the queues for nominated PodRefs.
// Note: it takes SchedulingQueue.lock inside.
// Make sure you don't call this function while taking any lock in any scenario.
nominatedPodsToInfo func([]PodRef) []*framework.PodInfo
}
func (npm *nominator) addNominatedPodUnlocked(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
@ -1589,13 +1661,13 @@ func (npm *nominator) addNominatedPodUnlocked(logger klog.Logger, pi *framework.
}
npm.nominatedPodToNode[pi.Pod.UID] = nodeName
for _, npi := range npm.nominatedPods[nodeName] {
if npi.Pod.UID == pi.Pod.UID {
logger.V(4).Info("Pod already exists in the nominator", "pod", klog.KObj(npi.Pod))
for _, np := range npm.nominatedPods[nodeName] {
if np.UID == pi.Pod.UID {
logger.V(4).Info("Pod already exists in the nominator", "pod", np.UID)
return
}
}
npm.nominatedPods[nodeName] = append(npm.nominatedPods[nodeName], pi)
npm.nominatedPods[nodeName] = append(npm.nominatedPods[nodeName], PodToRef(pi.Pod))
}
func (npm *nominator) delete(p *v1.Pod) {
@ -1604,7 +1676,7 @@ func (npm *nominator) delete(p *v1.Pod) {
return
}
for i, np := range npm.nominatedPods[nnn] {
if np.Pod.UID == p.UID {
if np.UID == p.UID {
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)
if len(npm.nominatedPods[nnn]) == 0 {
delete(npm.nominatedPods, nnn)
@ -1617,12 +1689,8 @@ func (npm *nominator) delete(p *v1.Pod) {
// UpdateNominatedPod updates the <oldPod> with <newPod>.
func (npm *nominator) UpdateNominatedPod(logger klog.Logger, oldPod *v1.Pod, newPodInfo *framework.PodInfo) {
npm.lock.Lock()
defer npm.lock.Unlock()
npm.updateNominatedPodUnlocked(logger, oldPod, newPodInfo)
}
func (npm *nominator) updateNominatedPodUnlocked(logger klog.Logger, oldPod *v1.Pod, newPodInfo *framework.PodInfo) {
npm.nLock.Lock()
defer npm.nLock.Unlock()
// In some cases, an Update event with no "NominatedNode" present is received right
// after a node("NominatedNode") is reserved for this pod in memory.
// In this case, we need to keep reserving the NominatedNode when updating the pod pointer.
@ -1646,18 +1714,12 @@ func (npm *nominator) updateNominatedPodUnlocked(logger klog.Logger, oldPod *v1.
npm.addNominatedPodUnlocked(logger, newPodInfo, nominatingInfo)
}
// NewPodNominator creates a nominator as a backing of framework.PodNominator.
// A podLister is passed in so as to check if the pod exists
// before adding its nominatedNode info.
func NewPodNominator(podLister listersv1.PodLister) framework.PodNominator {
return newPodNominator(podLister)
}
func newPodNominator(podLister listersv1.PodLister) *nominator {
func newPodNominator(podLister listersv1.PodLister, nominatedPodsToInfo func([]PodRef) []*framework.PodInfo) *nominator {
return &nominator{
podLister: podLister,
nominatedPods: make(map[string][]*framework.PodInfo),
nominatedPodToNode: make(map[types.UID]string),
podLister: podLister,
nominatedPods: make(map[string][]PodRef),
nominatedPodToNode: make(map[types.UID]string),
nominatedPodsToInfo: nominatedPodsToInfo,
}
}

View File

@ -85,7 +85,7 @@ var (
nominatorCmpOpts = []cmp.Option{
cmp.AllowUnexported(nominator{}),
cmpopts.IgnoreFields(nominator{}, "podLister", "lock"),
cmpopts.IgnoreFields(nominator{}, "podLister", "nLock", "nominatedPodsToInfo"),
}
queueHintReturnQueue = func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
@ -136,8 +136,8 @@ func TestPriorityQueue_Add(t *testing.T) {
medPriorityPodInfo.Pod.UID: "node1",
unschedulablePodInfo.Pod.UID: "node1",
},
nominatedPods: map[string][]*framework.PodInfo{
"node1": {medPriorityPodInfo, unschedulablePodInfo},
nominatedPods: map[string][]PodRef{
"node1": {PodToRef(medPriorityPodInfo.Pod), PodToRef(unschedulablePodInfo.Pod)},
},
}
if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" {
@ -870,8 +870,8 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
unschedulablePodInfo.Pod.UID: "node1",
highPriNominatedPodInfo.Pod.UID: "node1",
},
nominatedPods: map[string][]*framework.PodInfo{
"node1": {highPriNominatedPodInfo, unschedulablePodInfo},
nominatedPods: map[string][]PodRef{
"node1": {PodToRef(highPriNominatedPodInfo.Pod), PodToRef(unschedulablePodInfo.Pod)},
},
}
if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" {
@ -2178,10 +2178,10 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
highPriorityPodInfo.Pod.UID: "node2",
unschedulablePodInfo.Pod.UID: "node5",
},
nominatedPods: map[string][]*framework.PodInfo{
"node1": {medPriorityPodInfo},
"node2": {highPriorityPodInfo},
"node5": {unschedulablePodInfo},
nominatedPods: map[string][]PodRef{
"node1": {PodToRef(medPriorityPodInfo.Pod)},
"node2": {PodToRef(highPriorityPodInfo.Pod)},
"node5": {PodToRef(unschedulablePodInfo.Pod)},
},
}
if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" {
@ -2203,10 +2203,10 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
highPriorityPodInfo.Pod.UID: "node4",
unschedulablePodInfo.Pod.UID: "node5",
},
nominatedPods: map[string][]*framework.PodInfo{
"node1": {medPriorityPodInfo},
"node4": {highPriorityPodInfo},
"node5": {unschedulablePodInfo},
nominatedPods: map[string][]PodRef{
"node1": {PodToRef(medPriorityPodInfo.Pod)},
"node4": {PodToRef(highPriorityPodInfo.Pod)},
"node5": {PodToRef(unschedulablePodInfo.Pod)},
},
}
if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" {
@ -2236,9 +2236,9 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
medPriorityPodInfo.Pod.UID: "node1",
unschedulablePodInfo.Pod.UID: "node5",
},
nominatedPods: map[string][]*framework.PodInfo{
"node1": {medPriorityPodInfo},
"node5": {unschedulablePodInfo},
nominatedPods: map[string][]PodRef{
"node1": {PodToRef(medPriorityPodInfo.Pod)},
"node5": {PodToRef(unschedulablePodInfo.Pod)},
},
}
if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" {

View File

@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
@ -53,3 +54,17 @@ func NewTestQueueWithInformerFactory(
informerFactory.WaitForCacheSync(ctx.Done())
return pq
}
// NewPodNominator creates a nominator as a backing of framework.PodNominator.
// A podLister is passed in so as to check if the pod exists
// before adding its nominatedNode info.
func NewTestPodNominator(podLister listersv1.PodLister) framework.PodNominator {
nominatedPodsToInfo := func(nominatedPods []PodRef) []*framework.PodInfo {
pods := make([]*framework.PodInfo, len(nominatedPods))
for i, np := range nominatedPods {
pods[i] = &framework.PodInfo{Pod: np.ToPod()}
}
return pods
}
return newPodNominator(podLister, nominatedPodsToInfo)
}

View File

@ -2483,7 +2483,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
test.registerPlugins, "",
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
)
if err != nil {
t.Fatal(err)
@ -2546,7 +2546,7 @@ func TestFindFitAllError(t *testing.T) {
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
"",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)),
)
if err != nil {
t.Fatal(err)
@ -2586,7 +2586,7 @@ func TestFindFitSomeError(t *testing.T) {
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
"",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)),
)
if err != nil {
t.Fatal(err)
@ -2663,7 +2663,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
fwk, err := tf.NewFramework(
ctx,
registerPlugins, "",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)),
)
if err != nil {
t.Fatal(err)
@ -2804,7 +2804,7 @@ func TestZeroRequest(t *testing.T) {
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
)
if err != nil {
t.Fatalf("error creating framework: %+v", err)
@ -3207,7 +3207,7 @@ func Test_prioritizeNodes(t *testing.T) {
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
)
if err != nil {
t.Fatalf("error creating framework: %+v", err)
@ -3325,7 +3325,7 @@ func TestFairEvaluationForNodes(t *testing.T) {
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
"",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)),
)
if err != nil {
t.Fatal(err)
@ -3407,7 +3407,7 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) {
ctx,
registerPlugins, "",
frameworkruntime.WithClientSet(client),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
)
if err != nil {
t.Fatal(err)
@ -3565,7 +3565,7 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien
frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(recorder),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithWaitingPods(waitingPods),
)