Merge pull request #102645 from Huang-Wei/follow-up-fix-phantom-nominated-pod

cleanup usage of NewPodNominator
This commit is contained in:
Kubernetes Prow Robot 2021-06-10 19:44:12 -07:00 committed by GitHub
commit 013f1ae120
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 91 additions and 71 deletions

View File

@ -271,7 +271,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
test.registerPlugins, "",
runtime.WithClientSet(client),
runtime.WithInformerFactory(informerFactory),
runtime.WithPodNominator(internalqueue.NewPodNominator()),
runtime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
)
if err != nil {
t.Fatal(err)

View File

@ -995,7 +995,7 @@ func TestGenericScheduler(t *testing.T) {
test.registerPlugins, "",
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
)
if err != nil {
t.Fatal(err)
@ -1056,7 +1056,7 @@ func TestFindFitAllError(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
"",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
)
if err != nil {
t.Fatal(err)
@ -1089,7 +1089,7 @@ func TestFindFitSomeError(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
"",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
)
if err != nil {
t.Fatal(err)
@ -1163,7 +1163,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
}
fwk, err := st.NewFramework(
registerPlugins, "",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
)
if err != nil {
t.Fatal(err)
@ -1325,7 +1325,7 @@ func TestZeroRequest(t *testing.T) {
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
)
if err != nil {
t.Fatalf("error creating framework: %+v", err)
@ -1427,7 +1427,7 @@ func TestFairEvaluationForNodes(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
"",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
)
if err != nil {
t.Fatal(err)
@ -1493,7 +1493,8 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PreferNominatedNode, test.feature)()
// create three nodes in the cluster.
nodes := makeNodeList([]string{"node1", "node2", "node3"})
client := &clientsetfake.Clientset{}
client := clientsetfake.NewSimpleClientset(test.pod)
informerFactory := informers.NewSharedInformerFactory(client, 0)
cache := internalcache.New(time.Duration(0), wait.NeverStop)
for _, n := range nodes {
cache.AddNode(n)
@ -1513,7 +1514,7 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) {
fwk, err := st.NewFramework(
registerPlugins, "",
frameworkruntime.WithClientSet(client),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
)
if err != nil {
t.Fatal(err)

View File

@ -138,7 +138,7 @@ func (c *Configurator) create() (*Scheduler, error) {
}
// The nominator will be passed all the way to framework instantiation.
nominator := internalqueue.NewSafePodNominator(c.informerFactory.Core().V1().Pods().Lister())
nominator := internalqueue.NewPodNominator(c.informerFactory.Core().V1().Pods().Lister())
profiles, err := profile.NewMap(c.profiles, c.registry, c.recorderFactory,
frameworkruntime.WithComponentConfigVersion(c.componentConfigVersion),
frameworkruntime.WithClientSet(c.client),

View File

@ -585,7 +585,7 @@ type PostFilterResult struct {
// PodNominator abstracts operations to maintain nominated Pods.
type PodNominator interface {
// AddNominatedPod adds the given pod to the nominated pod map or
// AddNominatedPod adds the given pod to the nominator or
// updates it if it already exists.
AddNominatedPod(pod *PodInfo, nodeName string)
// DeleteNominatedPodIfExists deletes nominatedPod from internal cache. It's a no-op if it doesn't exist.

View File

@ -286,7 +286,7 @@ func TestPostFilter(t *testing.T) {
frameworkruntime.WithClientSet(cs),
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)),
)
@ -960,7 +960,6 @@ func TestDryRunPreemption(t *testing.T) {
labelKeys := []string{"hostname", "zone", "region"}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rand.Seed(4)
nodes := make([]*v1.Node, len(tt.nodeNames))
fakeFilterRCMap := make(map[string]framework.Code, len(tt.nodeNames))
for i, nodeName := range tt.nodeNames {
@ -992,7 +991,13 @@ func TestDryRunPreemption(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
)
registeredPlugins = append(registeredPlugins, tt.registerPlugins...)
objs := []runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}
var objs []runtime.Object
for _, p := range append(tt.testPods, tt.initPods...) {
objs = append(objs, p)
}
for _, n := range nodes {
objs = append(objs, n)
}
informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objs...), 0)
parallelism := parallelize.DefaultParallelism
if tt.disableParallelism {
@ -1003,7 +1008,7 @@ func TestDryRunPreemption(t *testing.T) {
}
fwk, err := st.NewFramework(
registeredPlugins, "",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithParallelism(parallelism),
@ -1030,6 +1035,10 @@ func TestDryRunPreemption(t *testing.T) {
}
pl := &DefaultPreemption{args: *tt.args}
// Using 4 as a seed source to test getOffsetAndNumCandidates() deterministically.
// However, we need to do it after informerFactory.WaitforCacheSync() which might
// set a seed.
rand.Seed(4)
var prevNumFilterCalled int32
for cycle, pod := range tt.testPods {
state := framework.NewCycleState()
@ -1222,6 +1231,14 @@ func TestSelectBestCandidate(t *testing.T) {
for i, nodeName := range tt.nodeNames {
nodes[i] = st.MakeNode().Name(nodeName).Capacity(veryLargeRes).Obj()
}
var objs []runtime.Object
objs = append(objs, tt.pod)
for _, pod := range tt.pods {
objs = append(objs, pod)
}
cs := clientsetfake.NewSimpleClientset(objs...)
informerFactory := informers.NewSharedInformerFactory(cs, 0)
snapshot := internalcache.NewSnapshot(tt.pods, nodes)
fwk, err := st.NewFramework(
[]st.RegisterPluginFunc{
@ -1230,7 +1247,7 @@ func TestSelectBestCandidate(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
"",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(snapshot),
)
if err != nil {
@ -1635,7 +1652,7 @@ func TestPreempt(t *testing.T) {
frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
frameworkruntime.WithInformerFactory(informerFactory),
)

View File

@ -1479,7 +1479,7 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) {
)
}
podNominator := internalqueue.NewPodNominator()
podNominator := internalqueue.NewPodNominator(nil)
if tt.nominatedPod != nil {
podNominator.AddNominatedPod(framework.NewPodInfo(tt.nominatedPod), nodeName)
}

View File

@ -247,7 +247,7 @@ func NewPriorityQueue(
}
if options.podNominator == nil {
options.podNominator = NewPodNominator()
options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister())
}
pq := &PriorityQueue{
@ -628,7 +628,7 @@ func (p *PriorityQueue) Close() {
}
// DeleteNominatedPodIfExists deletes <pod> from nominatedPods.
func (npm *nominatedPodMap) DeleteNominatedPodIfExists(pod *v1.Pod) {
func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) {
npm.Lock()
npm.delete(pod)
npm.Unlock()
@ -638,7 +638,7 @@ func (npm *nominatedPodMap) DeleteNominatedPodIfExists(pod *v1.Pod) {
// This is called during the preemption process after a node is nominated to run
// the pod. We update the structure before sending a request to update the pod
// object to avoid races with the following scheduling cycles.
func (npm *nominatedPodMap) AddNominatedPod(pi *framework.PodInfo, nodeName string) {
func (npm *nominator) AddNominatedPod(pi *framework.PodInfo, nodeName string) {
npm.Lock()
npm.add(pi, nodeName)
npm.Unlock()
@ -646,7 +646,7 @@ func (npm *nominatedPodMap) AddNominatedPod(pi *framework.PodInfo, nodeName stri
// NominatedPodsForNode returns pods that are nominated to run on the given node,
// but they are waiting for other pods to be removed from the node.
func (npm *nominatedPodMap) NominatedPodsForNode(nodeName string) []*framework.PodInfo {
func (npm *nominator) NominatedPodsForNode(nodeName string) []*framework.PodInfo {
npm.RLock()
defer npm.RUnlock()
// TODO: we may need to return a copy of []*Pods to avoid modification
@ -762,11 +762,11 @@ func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *Unschedulab
}
}
// nominatedPodMap is a structure that stores pods nominated to run on nodes.
// nominator is a structure that stores pods nominated to run on nodes.
// It exists because nominatedNodeName of pod objects stored in the structure
// may be different than what scheduler has here. We should be able to find pods
// by their UID and update/delete them.
type nominatedPodMap struct {
type nominator struct {
// podLister is used to verify if the given pod is alive.
podLister listersv1.PodLister
// nominatedPods is a map keyed by a node name and the value is a list of
@ -780,7 +780,7 @@ type nominatedPodMap struct {
sync.RWMutex
}
func (npm *nominatedPodMap) add(pi *framework.PodInfo, nodeName string) {
func (npm *nominator) add(pi *framework.PodInfo, nodeName string) {
// always delete the pod if it already exist, to ensure we never store more than
// one instance of the pod.
npm.delete(pi.Pod)
@ -796,7 +796,7 @@ func (npm *nominatedPodMap) add(pi *framework.PodInfo, nodeName string) {
if npm.podLister != nil {
// If the pod is not alive, don't contain it.
if _, err := npm.podLister.Pods(pi.Pod.Namespace).Get(pi.Pod.Name); err != nil {
klog.V(4).InfoS("Pod doesn't exist in podLister, aborting adding it to the nominated map", "pod", klog.KObj(pi.Pod))
klog.V(4).InfoS("Pod doesn't exist in podLister, aborting adding it to the nominator", "pod", klog.KObj(pi.Pod))
return
}
}
@ -804,14 +804,14 @@ func (npm *nominatedPodMap) add(pi *framework.PodInfo, nodeName string) {
npm.nominatedPodToNode[pi.Pod.UID] = nnn
for _, npi := range npm.nominatedPods[nnn] {
if npi.Pod.UID == pi.Pod.UID {
klog.V(4).InfoS("Pod already exists in the nominated map", "pod", klog.KObj(npi.Pod))
klog.V(4).InfoS("Pod already exists in the nominator", "pod", klog.KObj(npi.Pod))
return
}
}
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], pi)
}
func (npm *nominatedPodMap) delete(p *v1.Pod) {
func (npm *nominator) delete(p *v1.Pod) {
nnn, ok := npm.nominatedPodToNode[p.UID]
if !ok {
return
@ -829,7 +829,7 @@ func (npm *nominatedPodMap) delete(p *v1.Pod) {
}
// UpdateNominatedPod updates the <oldPod> with <newPod>.
func (npm *nominatedPodMap) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.PodInfo) {
func (npm *nominator) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.PodInfo) {
npm.Lock()
defer npm.Unlock()
// In some cases, an Update event with no "NominatedNode" present is received right
@ -852,17 +852,11 @@ func (npm *nominatedPodMap) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *frame
npm.add(newPodInfo, nodeName)
}
// NewPodNominator creates a nominatedPodMap as a backing of framework.PodNominator.
// DEPRECATED: use NewSafePodNominator() instead.
func NewPodNominator() framework.PodNominator {
return NewSafePodNominator(nil)
}
// NewSafePodNominator creates a nominatedPodMap as a backing of framework.PodNominator.
// Unlike NewPodNominator, it passes in a podLister so as to check if the pod is alive
// NewPodNominator creates a nominator as a backing of framework.PodNominator.
// A podLister is passed in so as to check if the pod exists
// before adding its nominatedNode info.
func NewSafePodNominator(podLister listersv1.PodLister) framework.PodNominator {
return &nominatedPodMap{
func NewPodNominator(podLister listersv1.PodLister) framework.PodNominator {
return &nominator{
podLister: podLister,
nominatedPods: make(map[string][]*framework.PodInfo),
nominatedPodToNode: make(map[types.UID]string),

View File

@ -27,6 +27,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -129,7 +130,8 @@ func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod {
}
func TestPriorityQueue_Add(t *testing.T) {
q := NewTestQueue(context.Background(), newDefaultQueueSort())
objs := []runtime.Object{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod}
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
if err := q.Add(medPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err)
}
@ -139,7 +141,7 @@ func TestPriorityQueue_Add(t *testing.T) {
if err := q.Add(highPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err)
}
expectedNominatedPods := &nominatedPodMap{
expectedNominatedPods := &nominator{
nominatedPodToNode: map[types.UID]string{
medPriorityPodInfo.Pod.UID: "node1",
unschedulablePodInfo.Pod.UID: "node1",
@ -148,7 +150,7 @@ func TestPriorityQueue_Add(t *testing.T) {
"node1": {medPriorityPodInfo, unschedulablePodInfo},
},
}
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" {
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" {
t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff)
}
if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod {
@ -160,8 +162,8 @@ func TestPriorityQueue_Add(t *testing.T) {
if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
}
if len(q.PodNominator.(*nominatedPodMap).nominatedPods["node1"]) != 2 {
t.Errorf("Expected medPriorityPodInfo and unschedulablePodInfo to be still present in nomindatePods: %v", q.PodNominator.(*nominatedPodMap).nominatedPods["node1"])
if len(q.PodNominator.(*nominator).nominatedPods["node1"]) != 2 {
t.Errorf("Expected medPriorityPodInfo and unschedulablePodInfo to be still present in nomindatePods: %v", q.PodNominator.(*nominator).nominatedPods["node1"])
}
}
@ -171,7 +173,8 @@ func newDefaultQueueSort() framework.LessFunc {
}
func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
q := NewTestQueue(context.Background(), newDefaultQueueSort())
objs := []runtime.Object{medPriorityPodInfo.Pod, highPriorityPodInfo.Pod}
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
if err := q.Add(medPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err)
}
@ -187,11 +190,12 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
}
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
q := NewTestQueue(context.Background(), newDefaultQueueSort())
objs := []runtime.Object{highPriNominatedPodInfo.Pod, unschedulablePodInfo.Pod}
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
q.Add(highPriNominatedPodInfo.Pod)
q.AddUnschedulableIfNotPresent(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod), q.SchedulingCycle()) // Must not add anything.
q.AddUnschedulableIfNotPresent(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod), q.SchedulingCycle())
expectedNominatedPods := &nominatedPodMap{
expectedNominatedPods := &nominator{
nominatedPodToNode: map[types.UID]string{
unschedulablePodInfo.Pod.UID: "node1",
highPriNominatedPodInfo.Pod.UID: "node1",
@ -200,13 +204,13 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
"node1": {highPriNominatedPodInfo, unschedulablePodInfo},
},
}
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" {
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" {
t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff)
}
if p, err := q.Pop(); err != nil || p.Pod != highPriNominatedPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriNominatedPodInfo.Pod.Name, p.Pod.Name)
}
if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 1 {
if len(q.PodNominator.(*nominator).nominatedPods) != 1 {
t.Errorf("Expected nomindatePods to have one element: %v", q.PodNominator)
}
if getUnschedulablePod(q, unschedulablePodInfo.Pod) != unschedulablePodInfo.Pod {
@ -284,7 +288,8 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
}
func TestPriorityQueue_Pop(t *testing.T) {
q := NewTestQueue(context.Background(), newDefaultQueueSort())
objs := []runtime.Object{medPriorityPodInfo.Pod}
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
@ -292,8 +297,8 @@ func TestPriorityQueue_Pop(t *testing.T) {
if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
}
if len(q.PodNominator.(*nominatedPodMap).nominatedPods["node1"]) != 1 {
t.Errorf("Expected medPriorityPodInfo to be present in nomindatePods: %v", q.PodNominator.(*nominatedPodMap).nominatedPods["node1"])
if len(q.PodNominator.(*nominator).nominatedPods["node1"]) != 1 {
t.Errorf("Expected medPriorityPodInfo to be present in nomindatePods: %v", q.PodNominator.(*nominator).nominatedPods["node1"])
}
}()
q.Add(medPriorityPodInfo.Pod)
@ -301,13 +306,14 @@ func TestPriorityQueue_Pop(t *testing.T) {
}
func TestPriorityQueue_Update(t *testing.T) {
objs := []runtime.Object{highPriorityPodInfo.Pod, unschedulablePodInfo.Pod, medPriorityPodInfo.Pod}
c := clock.NewFakeClock(time.Now())
q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c))
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs, WithClock(c))
q.Update(nil, highPriorityPodInfo.Pod)
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriorityPodInfo.Pod)); !exists {
t.Errorf("Expected %v to be added to activeQ.", highPriorityPodInfo.Pod.Name)
}
if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 0 {
if len(q.PodNominator.(*nominator).nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.PodNominator)
}
// Update highPriorityPodInfo and add a nominatedNodeName to it.
@ -315,7 +321,7 @@ func TestPriorityQueue_Update(t *testing.T) {
if q.activeQ.Len() != 1 {
t.Error("Expected only one item in activeQ.")
}
if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 1 {
if len(q.PodNominator.(*nominator).nominatedPods) != 1 {
t.Errorf("Expected one item in nomindatePods map: %v", q.PodNominator)
}
// Updating an unschedulable pod which is not in any of the two queues, should
@ -382,7 +388,8 @@ func TestPriorityQueue_Update(t *testing.T) {
}
func TestPriorityQueue_Delete(t *testing.T) {
q := NewTestQueue(context.Background(), newDefaultQueueSort())
objs := []runtime.Object{highPriorityPodInfo.Pod, unschedulablePodInfo.Pod}
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
q.Update(highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod)
q.Add(unschedulablePodInfo.Pod)
if err := q.Delete(highPriNominatedPodInfo.Pod); err != nil {
@ -394,19 +401,18 @@ func TestPriorityQueue_Delete(t *testing.T) {
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)); exists {
t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPodInfo.Pod.Name)
}
if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 1 {
t.Errorf("Expected nomindatePods to have only 'unschedulablePodInfo': %v", q.PodNominator.(*nominatedPodMap).nominatedPods)
if len(q.PodNominator.(*nominator).nominatedPods) != 1 {
t.Errorf("Expected nomindatePods to have only 'unschedulablePodInfo': %v", q.PodNominator.(*nominator).nominatedPods)
}
if err := q.Delete(unschedulablePodInfo.Pod); err != nil {
t.Errorf("delete failed: %v", err)
}
if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 0 {
if len(q.PodNominator.(*nominator).nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.PodNominator)
}
}
func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) {
tests := []struct {
name string
moveEvent framework.ClusterEvent
@ -630,7 +636,8 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
}
func TestPriorityQueue_NominatedPodsForNode(t *testing.T) {
q := NewTestQueue(context.Background(), newDefaultQueueSort())
objs := []runtime.Object{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod}
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
q.Add(medPriorityPodInfo.Pod)
q.Add(unschedulablePodInfo.Pod)
q.Add(highPriorityPodInfo.Pod)
@ -678,7 +685,7 @@ func TestPriorityQueue_NominatedPodDeleted(t *testing.T) {
podLister := informerFactory.Core().V1().Pods().Lister()
// Build a PriorityQueue.
q := NewPriorityQueue(newDefaultQueueSort(), informerFactory, WithPodNominator(NewSafePodNominator(podLister)))
q := NewPriorityQueue(newDefaultQueueSort(), informerFactory, WithPodNominator(NewPodNominator(podLister)))
ctx := context.Background()
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
@ -723,7 +730,8 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
}
func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
q := NewTestQueue(context.Background(), newDefaultQueueSort())
objs := []runtime.Object{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod}
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
if err := q.Add(medPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err)
}
@ -732,7 +740,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
// Update nominated node name of a pod on a node that is not specified in the pod object.
q.AddNominatedPod(framework.NewPodInfo(highPriorityPodInfo.Pod), "node2")
expectedNominatedPods := &nominatedPodMap{
expectedNominatedPods := &nominator{
nominatedPodToNode: map[types.UID]string{
medPriorityPodInfo.Pod.UID: "node1",
highPriorityPodInfo.Pod.UID: "node2",
@ -744,20 +752,20 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
"node5": {unschedulablePodInfo},
},
}
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" {
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" {
t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff)
}
if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
}
// List of nominated pods shouldn't change after popping them from the queue.
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" {
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" {
t.Errorf("Unexpected diff after popping pods (-want, +got):\n%s", diff)
}
// Update one of the nominated pods that doesn't have nominatedNodeName in the
// pod object. It should be updated correctly.
q.AddNominatedPod(highPriorityPodInfo, "node4")
expectedNominatedPods = &nominatedPodMap{
expectedNominatedPods = &nominator{
nominatedPodToNode: map[types.UID]string{
medPriorityPodInfo.Pod.UID: "node1",
highPriorityPodInfo.Pod.UID: "node4",
@ -769,14 +777,14 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
"node5": {unschedulablePodInfo},
},
}
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" {
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" {
t.Errorf("Unexpected diff after updating pods (-want, +got):\n%s", diff)
}
// Delete a nominated pod that doesn't have nominatedNodeName in the pod
// object. It should be deleted.
q.DeleteNominatedPodIfExists(highPriorityPodInfo.Pod)
expectedNominatedPods = &nominatedPodMap{
expectedNominatedPods = &nominator{
nominatedPodToNode: map[types.UID]string{
medPriorityPodInfo.Pod.UID: "node1",
unschedulablePodInfo.Pod.UID: "node5",
@ -786,7 +794,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
"node5": {unschedulablePodInfo},
},
}
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" {
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" {
t.Errorf("Unexpected diff after deleting pods (-want, +got):\n%s", diff)
}
}

View File

@ -827,7 +827,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(recorder),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
)
algo := core.NewGenericScheduler(