cleanup usage of NewPodNominator

- replace NewPodNominator() with NewSafePodNominator()
- rename nominatedPodMap to nominator
This commit is contained in:
Wei Huang 2021-06-05 11:30:17 -07:00
parent 53873fbf23
commit 36eaa11d50
No known key found for this signature in database
GPG Key ID: BE5E9752F8B6E005
9 changed files with 91 additions and 71 deletions

View File

@ -271,7 +271,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
test.registerPlugins, "", test.registerPlugins, "",
runtime.WithClientSet(client), runtime.WithClientSet(client),
runtime.WithInformerFactory(informerFactory), runtime.WithInformerFactory(informerFactory),
runtime.WithPodNominator(internalqueue.NewPodNominator()), runtime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
) )
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -995,7 +995,7 @@ func TestGenericScheduler(t *testing.T) {
test.registerPlugins, "", test.registerPlugins, "",
frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
) )
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -1056,7 +1056,7 @@ func TestFindFitAllError(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, },
"", "",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
) )
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -1089,7 +1089,7 @@ func TestFindFitSomeError(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, },
"", "",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
) )
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -1163,7 +1163,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
} }
fwk, err := st.NewFramework( fwk, err := st.NewFramework(
registerPlugins, "", registerPlugins, "",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
) )
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -1325,7 +1325,7 @@ func TestZeroRequest(t *testing.T) {
frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithClientSet(client), frameworkruntime.WithClientSet(client),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
) )
if err != nil { if err != nil {
t.Fatalf("error creating framework: %+v", err) t.Fatalf("error creating framework: %+v", err)
@ -1427,7 +1427,7 @@ func TestFairEvaluationForNodes(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, },
"", "",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
) )
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -1493,7 +1493,8 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PreferNominatedNode, test.feature)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PreferNominatedNode, test.feature)()
// create three nodes in the cluster. // create three nodes in the cluster.
nodes := makeNodeList([]string{"node1", "node2", "node3"}) nodes := makeNodeList([]string{"node1", "node2", "node3"})
client := &clientsetfake.Clientset{} client := clientsetfake.NewSimpleClientset(test.pod)
informerFactory := informers.NewSharedInformerFactory(client, 0)
cache := internalcache.New(time.Duration(0), wait.NeverStop) cache := internalcache.New(time.Duration(0), wait.NeverStop)
for _, n := range nodes { for _, n := range nodes {
cache.AddNode(n) cache.AddNode(n)
@ -1513,7 +1514,7 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) {
fwk, err := st.NewFramework( fwk, err := st.NewFramework(
registerPlugins, "", registerPlugins, "",
frameworkruntime.WithClientSet(client), frameworkruntime.WithClientSet(client),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
) )
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -138,7 +138,7 @@ func (c *Configurator) create() (*Scheduler, error) {
} }
// The nominator will be passed all the way to framework instantiation. // The nominator will be passed all the way to framework instantiation.
nominator := internalqueue.NewSafePodNominator(c.informerFactory.Core().V1().Pods().Lister()) nominator := internalqueue.NewPodNominator(c.informerFactory.Core().V1().Pods().Lister())
profiles, err := profile.NewMap(c.profiles, c.registry, c.recorderFactory, profiles, err := profile.NewMap(c.profiles, c.registry, c.recorderFactory,
frameworkruntime.WithComponentConfigVersion(c.componentConfigVersion), frameworkruntime.WithComponentConfigVersion(c.componentConfigVersion),
frameworkruntime.WithClientSet(c.client), frameworkruntime.WithClientSet(c.client),

View File

@ -585,7 +585,7 @@ type PostFilterResult struct {
// PodNominator abstracts operations to maintain nominated Pods. // PodNominator abstracts operations to maintain nominated Pods.
type PodNominator interface { type PodNominator interface {
// AddNominatedPod adds the given pod to the nominated pod map or // AddNominatedPod adds the given pod to the nominator or
// updates it if it already exists. // updates it if it already exists.
AddNominatedPod(pod *PodInfo, nodeName string) AddNominatedPod(pod *PodInfo, nodeName string)
// DeleteNominatedPodIfExists deletes nominatedPod from internal cache. It's a no-op if it doesn't exist. // DeleteNominatedPodIfExists deletes nominatedPod from internal cache. It's a no-op if it doesn't exist.

View File

@ -286,7 +286,7 @@ func TestPostFilter(t *testing.T) {
frameworkruntime.WithClientSet(cs), frameworkruntime.WithClientSet(cs),
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithExtenders(extenders), frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)),
) )
@ -960,7 +960,6 @@ func TestDryRunPreemption(t *testing.T) {
labelKeys := []string{"hostname", "zone", "region"} labelKeys := []string{"hostname", "zone", "region"}
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
rand.Seed(4)
nodes := make([]*v1.Node, len(tt.nodeNames)) nodes := make([]*v1.Node, len(tt.nodeNames))
fakeFilterRCMap := make(map[string]framework.Code, len(tt.nodeNames)) fakeFilterRCMap := make(map[string]framework.Code, len(tt.nodeNames))
for i, nodeName := range tt.nodeNames { for i, nodeName := range tt.nodeNames {
@ -992,7 +991,13 @@ func TestDryRunPreemption(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
) )
registeredPlugins = append(registeredPlugins, tt.registerPlugins...) registeredPlugins = append(registeredPlugins, tt.registerPlugins...)
objs := []runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}} var objs []runtime.Object
for _, p := range append(tt.testPods, tt.initPods...) {
objs = append(objs, p)
}
for _, n := range nodes {
objs = append(objs, n)
}
informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objs...), 0) informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objs...), 0)
parallelism := parallelize.DefaultParallelism parallelism := parallelize.DefaultParallelism
if tt.disableParallelism { if tt.disableParallelism {
@ -1003,7 +1008,7 @@ func TestDryRunPreemption(t *testing.T) {
} }
fwk, err := st.NewFramework( fwk, err := st.NewFramework(
registeredPlugins, "", registeredPlugins, "",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithParallelism(parallelism), frameworkruntime.WithParallelism(parallelism),
@ -1030,6 +1035,10 @@ func TestDryRunPreemption(t *testing.T) {
} }
pl := &DefaultPreemption{args: *tt.args} pl := &DefaultPreemption{args: *tt.args}
// Using 4 as a seed source to test getOffsetAndNumCandidates() deterministically.
// However, we need to do it after informerFactory.WaitforCacheSync() which might
// set a seed.
rand.Seed(4)
var prevNumFilterCalled int32 var prevNumFilterCalled int32
for cycle, pod := range tt.testPods { for cycle, pod := range tt.testPods {
state := framework.NewCycleState() state := framework.NewCycleState()
@ -1222,6 +1231,14 @@ func TestSelectBestCandidate(t *testing.T) {
for i, nodeName := range tt.nodeNames { for i, nodeName := range tt.nodeNames {
nodes[i] = st.MakeNode().Name(nodeName).Capacity(veryLargeRes).Obj() nodes[i] = st.MakeNode().Name(nodeName).Capacity(veryLargeRes).Obj()
} }
var objs []runtime.Object
objs = append(objs, tt.pod)
for _, pod := range tt.pods {
objs = append(objs, pod)
}
cs := clientsetfake.NewSimpleClientset(objs...)
informerFactory := informers.NewSharedInformerFactory(cs, 0)
snapshot := internalcache.NewSnapshot(tt.pods, nodes) snapshot := internalcache.NewSnapshot(tt.pods, nodes)
fwk, err := st.NewFramework( fwk, err := st.NewFramework(
[]st.RegisterPluginFunc{ []st.RegisterPluginFunc{
@ -1230,7 +1247,7 @@ func TestSelectBestCandidate(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, },
"", "",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithSnapshotSharedLister(snapshot),
) )
if err != nil { if err != nil {
@ -1635,7 +1652,7 @@ func TestPreempt(t *testing.T) {
frameworkruntime.WithClientSet(client), frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
frameworkruntime.WithExtenders(extenders), frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithInformerFactory(informerFactory),
) )

View File

@ -1479,7 +1479,7 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) {
) )
} }
podNominator := internalqueue.NewPodNominator() podNominator := internalqueue.NewPodNominator(nil)
if tt.nominatedPod != nil { if tt.nominatedPod != nil {
podNominator.AddNominatedPod(framework.NewPodInfo(tt.nominatedPod), nodeName) podNominator.AddNominatedPod(framework.NewPodInfo(tt.nominatedPod), nodeName)
} }

View File

@ -247,7 +247,7 @@ func NewPriorityQueue(
} }
if options.podNominator == nil { if options.podNominator == nil {
options.podNominator = NewPodNominator() options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister())
} }
pq := &PriorityQueue{ pq := &PriorityQueue{
@ -628,7 +628,7 @@ func (p *PriorityQueue) Close() {
} }
// DeleteNominatedPodIfExists deletes <pod> from nominatedPods. // DeleteNominatedPodIfExists deletes <pod> from nominatedPods.
func (npm *nominatedPodMap) DeleteNominatedPodIfExists(pod *v1.Pod) { func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) {
npm.Lock() npm.Lock()
npm.delete(pod) npm.delete(pod)
npm.Unlock() npm.Unlock()
@ -638,7 +638,7 @@ func (npm *nominatedPodMap) DeleteNominatedPodIfExists(pod *v1.Pod) {
// This is called during the preemption process after a node is nominated to run // This is called during the preemption process after a node is nominated to run
// the pod. We update the structure before sending a request to update the pod // the pod. We update the structure before sending a request to update the pod
// object to avoid races with the following scheduling cycles. // object to avoid races with the following scheduling cycles.
func (npm *nominatedPodMap) AddNominatedPod(pi *framework.PodInfo, nodeName string) { func (npm *nominator) AddNominatedPod(pi *framework.PodInfo, nodeName string) {
npm.Lock() npm.Lock()
npm.add(pi, nodeName) npm.add(pi, nodeName)
npm.Unlock() npm.Unlock()
@ -646,7 +646,7 @@ func (npm *nominatedPodMap) AddNominatedPod(pi *framework.PodInfo, nodeName stri
// NominatedPodsForNode returns pods that are nominated to run on the given node, // NominatedPodsForNode returns pods that are nominated to run on the given node,
// but they are waiting for other pods to be removed from the node. // but they are waiting for other pods to be removed from the node.
func (npm *nominatedPodMap) NominatedPodsForNode(nodeName string) []*framework.PodInfo { func (npm *nominator) NominatedPodsForNode(nodeName string) []*framework.PodInfo {
npm.RLock() npm.RLock()
defer npm.RUnlock() defer npm.RUnlock()
// TODO: we may need to return a copy of []*Pods to avoid modification // TODO: we may need to return a copy of []*Pods to avoid modification
@ -762,11 +762,11 @@ func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *Unschedulab
} }
} }
// nominatedPodMap is a structure that stores pods nominated to run on nodes. // nominator is a structure that stores pods nominated to run on nodes.
// It exists because nominatedNodeName of pod objects stored in the structure // It exists because nominatedNodeName of pod objects stored in the structure
// may be different than what scheduler has here. We should be able to find pods // may be different than what scheduler has here. We should be able to find pods
// by their UID and update/delete them. // by their UID and update/delete them.
type nominatedPodMap struct { type nominator struct {
// podLister is used to verify if the given pod is alive. // podLister is used to verify if the given pod is alive.
podLister listersv1.PodLister podLister listersv1.PodLister
// nominatedPods is a map keyed by a node name and the value is a list of // nominatedPods is a map keyed by a node name and the value is a list of
@ -780,7 +780,7 @@ type nominatedPodMap struct {
sync.RWMutex sync.RWMutex
} }
func (npm *nominatedPodMap) add(pi *framework.PodInfo, nodeName string) { func (npm *nominator) add(pi *framework.PodInfo, nodeName string) {
// always delete the pod if it already exist, to ensure we never store more than // always delete the pod if it already exist, to ensure we never store more than
// one instance of the pod. // one instance of the pod.
npm.delete(pi.Pod) npm.delete(pi.Pod)
@ -796,7 +796,7 @@ func (npm *nominatedPodMap) add(pi *framework.PodInfo, nodeName string) {
if npm.podLister != nil { if npm.podLister != nil {
// If the pod is not alive, don't contain it. // If the pod is not alive, don't contain it.
if _, err := npm.podLister.Pods(pi.Pod.Namespace).Get(pi.Pod.Name); err != nil { if _, err := npm.podLister.Pods(pi.Pod.Namespace).Get(pi.Pod.Name); err != nil {
klog.V(4).InfoS("Pod doesn't exist in podLister, aborting adding it to the nominated map", "pod", klog.KObj(pi.Pod)) klog.V(4).InfoS("Pod doesn't exist in podLister, aborting adding it to the nominator", "pod", klog.KObj(pi.Pod))
return return
} }
} }
@ -804,14 +804,14 @@ func (npm *nominatedPodMap) add(pi *framework.PodInfo, nodeName string) {
npm.nominatedPodToNode[pi.Pod.UID] = nnn npm.nominatedPodToNode[pi.Pod.UID] = nnn
for _, npi := range npm.nominatedPods[nnn] { for _, npi := range npm.nominatedPods[nnn] {
if npi.Pod.UID == pi.Pod.UID { if npi.Pod.UID == pi.Pod.UID {
klog.V(4).InfoS("Pod already exists in the nominated map", "pod", klog.KObj(npi.Pod)) klog.V(4).InfoS("Pod already exists in the nominator", "pod", klog.KObj(npi.Pod))
return return
} }
} }
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], pi) npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], pi)
} }
func (npm *nominatedPodMap) delete(p *v1.Pod) { func (npm *nominator) delete(p *v1.Pod) {
nnn, ok := npm.nominatedPodToNode[p.UID] nnn, ok := npm.nominatedPodToNode[p.UID]
if !ok { if !ok {
return return
@ -829,7 +829,7 @@ func (npm *nominatedPodMap) delete(p *v1.Pod) {
} }
// UpdateNominatedPod updates the <oldPod> with <newPod>. // UpdateNominatedPod updates the <oldPod> with <newPod>.
func (npm *nominatedPodMap) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.PodInfo) { func (npm *nominator) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.PodInfo) {
npm.Lock() npm.Lock()
defer npm.Unlock() defer npm.Unlock()
// In some cases, an Update event with no "NominatedNode" present is received right // In some cases, an Update event with no "NominatedNode" present is received right
@ -852,17 +852,11 @@ func (npm *nominatedPodMap) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *frame
npm.add(newPodInfo, nodeName) npm.add(newPodInfo, nodeName)
} }
// NewPodNominator creates a nominatedPodMap as a backing of framework.PodNominator. // NewPodNominator creates a nominator as a backing of framework.PodNominator.
// DEPRECATED: use NewSafePodNominator() instead. // A podLister is passed in so as to check if the pod exists
func NewPodNominator() framework.PodNominator {
return NewSafePodNominator(nil)
}
// NewSafePodNominator creates a nominatedPodMap as a backing of framework.PodNominator.
// Unlike NewPodNominator, it passes in a podLister so as to check if the pod is alive
// before adding its nominatedNode info. // before adding its nominatedNode info.
func NewSafePodNominator(podLister listersv1.PodLister) framework.PodNominator { func NewPodNominator(podLister listersv1.PodLister) framework.PodNominator {
return &nominatedPodMap{ return &nominator{
podLister: podLister, podLister: podLister,
nominatedPods: make(map[string][]*framework.PodInfo), nominatedPods: make(map[string][]*framework.PodInfo),
nominatedPodToNode: make(map[types.UID]string), nominatedPodToNode: make(map[types.UID]string),

View File

@ -27,6 +27,7 @@ import (
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts" "github.com/google/go-cmp/cmp/cmpopts"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -129,7 +130,8 @@ func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod {
} }
func TestPriorityQueue_Add(t *testing.T) { func TestPriorityQueue_Add(t *testing.T) {
q := NewTestQueue(context.Background(), newDefaultQueueSort()) objs := []runtime.Object{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod}
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
if err := q.Add(medPriorityPodInfo.Pod); err != nil { if err := q.Add(medPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err) t.Errorf("add failed: %v", err)
} }
@ -139,7 +141,7 @@ func TestPriorityQueue_Add(t *testing.T) {
if err := q.Add(highPriorityPodInfo.Pod); err != nil { if err := q.Add(highPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err) t.Errorf("add failed: %v", err)
} }
expectedNominatedPods := &nominatedPodMap{ expectedNominatedPods := &nominator{
nominatedPodToNode: map[types.UID]string{ nominatedPodToNode: map[types.UID]string{
medPriorityPodInfo.Pod.UID: "node1", medPriorityPodInfo.Pod.UID: "node1",
unschedulablePodInfo.Pod.UID: "node1", unschedulablePodInfo.Pod.UID: "node1",
@ -148,7 +150,7 @@ func TestPriorityQueue_Add(t *testing.T) {
"node1": {medPriorityPodInfo, unschedulablePodInfo}, "node1": {medPriorityPodInfo, unschedulablePodInfo},
}, },
} }
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" {
t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff) t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff)
} }
if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod { if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod {
@ -160,8 +162,8 @@ func TestPriorityQueue_Add(t *testing.T) {
if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod { if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
} }
if len(q.PodNominator.(*nominatedPodMap).nominatedPods["node1"]) != 2 { if len(q.PodNominator.(*nominator).nominatedPods["node1"]) != 2 {
t.Errorf("Expected medPriorityPodInfo and unschedulablePodInfo to be still present in nomindatePods: %v", q.PodNominator.(*nominatedPodMap).nominatedPods["node1"]) t.Errorf("Expected medPriorityPodInfo and unschedulablePodInfo to be still present in nomindatePods: %v", q.PodNominator.(*nominator).nominatedPods["node1"])
} }
} }
@ -171,7 +173,8 @@ func newDefaultQueueSort() framework.LessFunc {
} }
func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
q := NewTestQueue(context.Background(), newDefaultQueueSort()) objs := []runtime.Object{medPriorityPodInfo.Pod, highPriorityPodInfo.Pod}
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
if err := q.Add(medPriorityPodInfo.Pod); err != nil { if err := q.Add(medPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err) t.Errorf("add failed: %v", err)
} }
@ -187,11 +190,12 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
} }
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
q := NewTestQueue(context.Background(), newDefaultQueueSort()) objs := []runtime.Object{highPriNominatedPodInfo.Pod, unschedulablePodInfo.Pod}
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
q.Add(highPriNominatedPodInfo.Pod) q.Add(highPriNominatedPodInfo.Pod)
q.AddUnschedulableIfNotPresent(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod), q.SchedulingCycle()) // Must not add anything. q.AddUnschedulableIfNotPresent(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod), q.SchedulingCycle()) // Must not add anything.
q.AddUnschedulableIfNotPresent(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod), q.SchedulingCycle())
expectedNominatedPods := &nominatedPodMap{ expectedNominatedPods := &nominator{
nominatedPodToNode: map[types.UID]string{ nominatedPodToNode: map[types.UID]string{
unschedulablePodInfo.Pod.UID: "node1", unschedulablePodInfo.Pod.UID: "node1",
highPriNominatedPodInfo.Pod.UID: "node1", highPriNominatedPodInfo.Pod.UID: "node1",
@ -200,13 +204,13 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
"node1": {highPriNominatedPodInfo, unschedulablePodInfo}, "node1": {highPriNominatedPodInfo, unschedulablePodInfo},
}, },
} }
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" {
t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff) t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff)
} }
if p, err := q.Pop(); err != nil || p.Pod != highPriNominatedPodInfo.Pod { if p, err := q.Pop(); err != nil || p.Pod != highPriNominatedPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriNominatedPodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", highPriNominatedPodInfo.Pod.Name, p.Pod.Name)
} }
if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 1 { if len(q.PodNominator.(*nominator).nominatedPods) != 1 {
t.Errorf("Expected nomindatePods to have one element: %v", q.PodNominator) t.Errorf("Expected nomindatePods to have one element: %v", q.PodNominator)
} }
if getUnschedulablePod(q, unschedulablePodInfo.Pod) != unschedulablePodInfo.Pod { if getUnschedulablePod(q, unschedulablePodInfo.Pod) != unschedulablePodInfo.Pod {
@ -284,7 +288,8 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
} }
func TestPriorityQueue_Pop(t *testing.T) { func TestPriorityQueue_Pop(t *testing.T) {
q := NewTestQueue(context.Background(), newDefaultQueueSort()) objs := []runtime.Object{medPriorityPodInfo.Pod}
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
go func() { go func() {
@ -292,8 +297,8 @@ func TestPriorityQueue_Pop(t *testing.T) {
if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod { if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
} }
if len(q.PodNominator.(*nominatedPodMap).nominatedPods["node1"]) != 1 { if len(q.PodNominator.(*nominator).nominatedPods["node1"]) != 1 {
t.Errorf("Expected medPriorityPodInfo to be present in nomindatePods: %v", q.PodNominator.(*nominatedPodMap).nominatedPods["node1"]) t.Errorf("Expected medPriorityPodInfo to be present in nomindatePods: %v", q.PodNominator.(*nominator).nominatedPods["node1"])
} }
}() }()
q.Add(medPriorityPodInfo.Pod) q.Add(medPriorityPodInfo.Pod)
@ -301,13 +306,14 @@ func TestPriorityQueue_Pop(t *testing.T) {
} }
func TestPriorityQueue_Update(t *testing.T) { func TestPriorityQueue_Update(t *testing.T) {
objs := []runtime.Object{highPriorityPodInfo.Pod, unschedulablePodInfo.Pod, medPriorityPodInfo.Pod}
c := clock.NewFakeClock(time.Now()) c := clock.NewFakeClock(time.Now())
q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c)) q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs, WithClock(c))
q.Update(nil, highPriorityPodInfo.Pod) q.Update(nil, highPriorityPodInfo.Pod)
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriorityPodInfo.Pod)); !exists { if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriorityPodInfo.Pod)); !exists {
t.Errorf("Expected %v to be added to activeQ.", highPriorityPodInfo.Pod.Name) t.Errorf("Expected %v to be added to activeQ.", highPriorityPodInfo.Pod.Name)
} }
if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 0 { if len(q.PodNominator.(*nominator).nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.PodNominator) t.Errorf("Expected nomindatePods to be empty: %v", q.PodNominator)
} }
// Update highPriorityPodInfo and add a nominatedNodeName to it. // Update highPriorityPodInfo and add a nominatedNodeName to it.
@ -315,7 +321,7 @@ func TestPriorityQueue_Update(t *testing.T) {
if q.activeQ.Len() != 1 { if q.activeQ.Len() != 1 {
t.Error("Expected only one item in activeQ.") t.Error("Expected only one item in activeQ.")
} }
if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 1 { if len(q.PodNominator.(*nominator).nominatedPods) != 1 {
t.Errorf("Expected one item in nomindatePods map: %v", q.PodNominator) t.Errorf("Expected one item in nomindatePods map: %v", q.PodNominator)
} }
// Updating an unschedulable pod which is not in any of the two queues, should // Updating an unschedulable pod which is not in any of the two queues, should
@ -382,7 +388,8 @@ func TestPriorityQueue_Update(t *testing.T) {
} }
func TestPriorityQueue_Delete(t *testing.T) { func TestPriorityQueue_Delete(t *testing.T) {
q := NewTestQueue(context.Background(), newDefaultQueueSort()) objs := []runtime.Object{highPriorityPodInfo.Pod, unschedulablePodInfo.Pod}
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
q.Update(highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod) q.Update(highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod)
q.Add(unschedulablePodInfo.Pod) q.Add(unschedulablePodInfo.Pod)
if err := q.Delete(highPriNominatedPodInfo.Pod); err != nil { if err := q.Delete(highPriNominatedPodInfo.Pod); err != nil {
@ -394,19 +401,18 @@ func TestPriorityQueue_Delete(t *testing.T) {
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)); exists { if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)); exists {
t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPodInfo.Pod.Name) t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPodInfo.Pod.Name)
} }
if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 1 { if len(q.PodNominator.(*nominator).nominatedPods) != 1 {
t.Errorf("Expected nomindatePods to have only 'unschedulablePodInfo': %v", q.PodNominator.(*nominatedPodMap).nominatedPods) t.Errorf("Expected nomindatePods to have only 'unschedulablePodInfo': %v", q.PodNominator.(*nominator).nominatedPods)
} }
if err := q.Delete(unschedulablePodInfo.Pod); err != nil { if err := q.Delete(unschedulablePodInfo.Pod); err != nil {
t.Errorf("delete failed: %v", err) t.Errorf("delete failed: %v", err)
} }
if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 0 { if len(q.PodNominator.(*nominator).nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.PodNominator) t.Errorf("Expected nomindatePods to be empty: %v", q.PodNominator)
} }
} }
func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) { func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) {
tests := []struct { tests := []struct {
name string name string
moveEvent framework.ClusterEvent moveEvent framework.ClusterEvent
@ -630,7 +636,8 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
} }
func TestPriorityQueue_NominatedPodsForNode(t *testing.T) { func TestPriorityQueue_NominatedPodsForNode(t *testing.T) {
q := NewTestQueue(context.Background(), newDefaultQueueSort()) objs := []runtime.Object{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod}
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
q.Add(medPriorityPodInfo.Pod) q.Add(medPriorityPodInfo.Pod)
q.Add(unschedulablePodInfo.Pod) q.Add(unschedulablePodInfo.Pod)
q.Add(highPriorityPodInfo.Pod) q.Add(highPriorityPodInfo.Pod)
@ -678,7 +685,7 @@ func TestPriorityQueue_NominatedPodDeleted(t *testing.T) {
podLister := informerFactory.Core().V1().Pods().Lister() podLister := informerFactory.Core().V1().Pods().Lister()
// Build a PriorityQueue. // Build a PriorityQueue.
q := NewPriorityQueue(newDefaultQueueSort(), informerFactory, WithPodNominator(NewSafePodNominator(podLister))) q := NewPriorityQueue(newDefaultQueueSort(), informerFactory, WithPodNominator(NewPodNominator(podLister)))
ctx := context.Background() ctx := context.Background()
informerFactory.Start(ctx.Done()) informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done())
@ -723,7 +730,8 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
} }
func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
q := NewTestQueue(context.Background(), newDefaultQueueSort()) objs := []runtime.Object{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod}
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
if err := q.Add(medPriorityPodInfo.Pod); err != nil { if err := q.Add(medPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err) t.Errorf("add failed: %v", err)
} }
@ -732,7 +740,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
// Update nominated node name of a pod on a node that is not specified in the pod object. // Update nominated node name of a pod on a node that is not specified in the pod object.
q.AddNominatedPod(framework.NewPodInfo(highPriorityPodInfo.Pod), "node2") q.AddNominatedPod(framework.NewPodInfo(highPriorityPodInfo.Pod), "node2")
expectedNominatedPods := &nominatedPodMap{ expectedNominatedPods := &nominator{
nominatedPodToNode: map[types.UID]string{ nominatedPodToNode: map[types.UID]string{
medPriorityPodInfo.Pod.UID: "node1", medPriorityPodInfo.Pod.UID: "node1",
highPriorityPodInfo.Pod.UID: "node2", highPriorityPodInfo.Pod.UID: "node2",
@ -744,20 +752,20 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
"node5": {unschedulablePodInfo}, "node5": {unschedulablePodInfo},
}, },
} }
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" {
t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff) t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff)
} }
if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod { if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
} }
// List of nominated pods shouldn't change after popping them from the queue. // List of nominated pods shouldn't change after popping them from the queue.
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" {
t.Errorf("Unexpected diff after popping pods (-want, +got):\n%s", diff) t.Errorf("Unexpected diff after popping pods (-want, +got):\n%s", diff)
} }
// Update one of the nominated pods that doesn't have nominatedNodeName in the // Update one of the nominated pods that doesn't have nominatedNodeName in the
// pod object. It should be updated correctly. // pod object. It should be updated correctly.
q.AddNominatedPod(highPriorityPodInfo, "node4") q.AddNominatedPod(highPriorityPodInfo, "node4")
expectedNominatedPods = &nominatedPodMap{ expectedNominatedPods = &nominator{
nominatedPodToNode: map[types.UID]string{ nominatedPodToNode: map[types.UID]string{
medPriorityPodInfo.Pod.UID: "node1", medPriorityPodInfo.Pod.UID: "node1",
highPriorityPodInfo.Pod.UID: "node4", highPriorityPodInfo.Pod.UID: "node4",
@ -769,14 +777,14 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
"node5": {unschedulablePodInfo}, "node5": {unschedulablePodInfo},
}, },
} }
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" {
t.Errorf("Unexpected diff after updating pods (-want, +got):\n%s", diff) t.Errorf("Unexpected diff after updating pods (-want, +got):\n%s", diff)
} }
// Delete a nominated pod that doesn't have nominatedNodeName in the pod // Delete a nominated pod that doesn't have nominatedNodeName in the pod
// object. It should be deleted. // object. It should be deleted.
q.DeleteNominatedPodIfExists(highPriorityPodInfo.Pod) q.DeleteNominatedPodIfExists(highPriorityPodInfo.Pod)
expectedNominatedPods = &nominatedPodMap{ expectedNominatedPods = &nominator{
nominatedPodToNode: map[types.UID]string{ nominatedPodToNode: map[types.UID]string{
medPriorityPodInfo.Pod.UID: "node1", medPriorityPodInfo.Pod.UID: "node1",
unschedulablePodInfo.Pod.UID: "node5", unschedulablePodInfo.Pod.UID: "node5",
@ -786,7 +794,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
"node5": {unschedulablePodInfo}, "node5": {unschedulablePodInfo},
}, },
} }
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" {
t.Errorf("Unexpected diff after deleting pods (-want, +got):\n%s", diff) t.Errorf("Unexpected diff after deleting pods (-want, +got):\n%s", diff)
} }
} }

View File

@ -827,7 +827,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
frameworkruntime.WithClientSet(client), frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(recorder), frameworkruntime.WithEventRecorder(recorder),
frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
) )
algo := core.NewGenericScheduler( algo := core.NewGenericScheduler(