From 2433b083a928af7eed1ee9594e37ce6c742237d6 Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Thu, 16 Dec 2021 10:54:58 -0800 Subject: [PATCH] clear pod's .status.nominatedNodeName when necessary --- pkg/scheduler/framework/interface.go | 32 +- .../default_preemption_test.go | 55 ++-- .../framework/preemption/preemption.go | 18 +- pkg/scheduler/framework/runtime/framework.go | 6 +- .../framework/runtime/framework_test.go | 6 +- pkg/scheduler/generic_scheduler_test.go | 3 +- .../internal/queue/scheduling_queue.go | 41 +-- .../internal/queue/scheduling_queue_test.go | 10 +- pkg/scheduler/scheduler.go | 42 +-- pkg/scheduler/scheduler_test.go | 6 +- test/integration/scheduler/preemption_test.go | 300 ++++++++++++------ test/integration/scheduler/util.go | 29 -- 12 files changed, 343 insertions(+), 205 deletions(-) diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index a72daab2db8..df230d96895 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -608,16 +608,44 @@ type Handle interface { Parallelizer() parallelize.Parallelizer } +type NominatingMode int + +const ( + ModeNoop NominatingMode = iota + ModeOverride +) + +type NominatingInfo struct { + NominatedNodeName string + NominatingMode NominatingMode +} + // PostFilterResult wraps needed info for scheduler framework to act upon PostFilter phase. type PostFilterResult struct { - NominatedNodeName string + *NominatingInfo +} + +func NewPostFilterResultWithNominatedNode(name string) *PostFilterResult { + return &PostFilterResult{ + NominatingInfo: &NominatingInfo{ + NominatedNodeName: name, + NominatingMode: ModeOverride, + }, + } +} + +func (ni *NominatingInfo) Mode() NominatingMode { + if ni == nil { + return ModeNoop + } + return ni.NominatingMode } // PodNominator abstracts operations to maintain nominated Pods. type PodNominator interface { // AddNominatedPod adds the given pod to the nominator or // updates it if it already exists. - AddNominatedPod(pod *PodInfo, nodeName string) + AddNominatedPod(pod *PodInfo, nominatingInfo *NominatingInfo) // DeleteNominatedPodIfExists deletes nominatedPod from internal cache. It's a no-op if it doesn't exist. DeleteNominatedPodIfExists(pod *v1.Pod) // UpdateNominatedPod updates the with . diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index 701a04455c1..cd55946c58d 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -161,7 +161,7 @@ func TestPostFilter(t *testing.T) { filteredNodesStatuses: framework.NodeToStatusMap{ "node1": framework.NewStatus(framework.Unschedulable), }, - wantResult: &framework.PostFilterResult{NominatedNodeName: "node1"}, + wantResult: framework.NewPostFilterResultWithNominatedNode("node1"), wantStatus: framework.NewStatus(framework.Success), }, { @@ -176,7 +176,7 @@ func TestPostFilter(t *testing.T) { filteredNodesStatuses: framework.NodeToStatusMap{ "node1": framework.NewStatus(framework.Unschedulable), }, - wantResult: nil, + wantResult: framework.NewPostFilterResultWithNominatedNode(""), wantStatus: framework.NewStatus(framework.Unschedulable, "0/1 nodes are available: 1 No victims found on node node1 for preemptor pod p."), }, { @@ -191,7 +191,7 @@ func TestPostFilter(t *testing.T) { filteredNodesStatuses: framework.NodeToStatusMap{ "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable), }, - wantResult: nil, + wantResult: framework.NewPostFilterResultWithNominatedNode(""), wantStatus: framework.NewStatus(framework.Unschedulable, "0/1 nodes are available: 1 Preemption is not helpful for scheduling."), }, { @@ -209,7 +209,7 @@ func TestPostFilter(t *testing.T) { "node1": framework.NewStatus(framework.Unschedulable), "node2": framework.NewStatus(framework.Unschedulable), }, - wantResult: &framework.PostFilterResult{NominatedNodeName: "node2"}, + wantResult: framework.NewPostFilterResultWithNominatedNode("node2"), wantStatus: framework.NewStatus(framework.Success), }, { @@ -227,10 +227,8 @@ func TestPostFilter(t *testing.T) { "node1": framework.NewStatus(framework.Unschedulable), "node2": framework.NewStatus(framework.Unschedulable), }, - extender: &st.FakeExtender{Predicates: []st.FitPredicate{st.Node1PredicateExtender}}, - wantResult: &framework.PostFilterResult{ - NominatedNodeName: "node1", - }, + extender: &st.FakeExtender{Predicates: []st.FitPredicate{st.Node1PredicateExtender}}, + wantResult: framework.NewPostFilterResultWithNominatedNode("node1"), wantStatus: framework.NewStatus(framework.Success), }, { @@ -248,7 +246,7 @@ func TestPostFilter(t *testing.T) { "node1": framework.NewStatus(framework.Unschedulable), "node2": framework.NewStatus(framework.Unschedulable), }, - wantResult: nil, + wantResult: framework.NewPostFilterResultWithNominatedNode(""), wantStatus: framework.NewStatus(framework.Unschedulable, "0/2 nodes are available: 2 Insufficient cpu."), }, { @@ -266,7 +264,7 @@ func TestPostFilter(t *testing.T) { "node1": framework.NewStatus(framework.Unschedulable), "node2": framework.NewStatus(framework.Unschedulable), }, - wantResult: nil, + wantResult: framework.NewPostFilterResultWithNominatedNode(""), wantStatus: framework.NewStatus(framework.Unschedulable, "0/2 nodes are available: 1 Insufficient cpu, 1 No victims found on node node1 for preemptor pod p."), }, { @@ -286,7 +284,7 @@ func TestPostFilter(t *testing.T) { "node3": framework.NewStatus(framework.UnschedulableAndUnresolvable), "node4": framework.NewStatus(framework.UnschedulableAndUnresolvable), }, - wantResult: nil, + wantResult: framework.NewPostFilterResultWithNominatedNode(""), wantStatus: framework.NewStatus(framework.Unschedulable, "0/4 nodes are available: 2 Insufficient cpu, 2 Preemption is not helpful for scheduling."), }, { @@ -317,9 +315,7 @@ func TestPostFilter(t *testing.T) { "node1": framework.NewStatus(framework.Unschedulable), "node2": framework.NewStatus(framework.Unschedulable), }, - wantResult: &framework.PostFilterResult{ - NominatedNodeName: "node2", - }, + wantResult: framework.NewPostFilterResultWithNominatedNode("node2"), wantStatus: framework.NewStatus(framework.Success), }, } @@ -1465,7 +1461,7 @@ func TestPreempt(t *testing.T) { extenders []*st.FakeExtender nodeNames []string registerPlugin st.RegisterPluginFunc - expectedNode string + want *framework.PostFilterResult expectedPods []string // list of preempted pods }{ { @@ -1479,7 +1475,7 @@ func TestPreempt(t *testing.T) { }, nodeNames: []string{"node1", "node2", "node3"}, registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"), - expectedNode: "node1", + want: framework.NewPostFilterResultWithNominatedNode("node1"), expectedPods: []string{"p1.1", "p1.2"}, }, { @@ -1497,7 +1493,7 @@ func TestPreempt(t *testing.T) { }, nodeNames: []string{"node-a/zone1", "node-b/zone1", "node-x/zone2"}, registerPlugin: st.RegisterPluginAsExtensions(podtopologyspread.Name, podtopologyspread.New, "PreFilter", "Filter"), - expectedNode: "node-b", + want: framework.NewPostFilterResultWithNominatedNode("node-b"), expectedPods: []string{"p-b1"}, }, { @@ -1514,7 +1510,7 @@ func TestPreempt(t *testing.T) { {Predicates: []st.FitPredicate{st.Node1PredicateExtender}}, }, registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"), - expectedNode: "node1", + want: framework.NewPostFilterResultWithNominatedNode("node1"), expectedPods: []string{"p1.1", "p1.2"}, }, { @@ -1530,7 +1526,7 @@ func TestPreempt(t *testing.T) { {Predicates: []st.FitPredicate{st.FalsePredicateExtender}}, }, registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"), - expectedNode: "", + want: nil, expectedPods: []string{}, }, { @@ -1547,7 +1543,7 @@ func TestPreempt(t *testing.T) { {Predicates: []st.FitPredicate{st.Node1PredicateExtender}}, }, registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"), - expectedNode: "node1", + want: framework.NewPostFilterResultWithNominatedNode("node1"), expectedPods: []string{"p1.1", "p1.2"}, }, { @@ -1564,8 +1560,8 @@ func TestPreempt(t *testing.T) { {Predicates: []st.FitPredicate{st.TruePredicateExtender}}, }, registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"), - //sum of priorities of all victims on node1 is larger than node2, node2 is chosen. - expectedNode: "node2", + // sum of priorities of all victims on node1 is larger than node2, node2 is chosen. + want: framework.NewPostFilterResultWithNominatedNode("node2"), expectedPods: []string{"p2.1"}, }, { @@ -1579,7 +1575,7 @@ func TestPreempt(t *testing.T) { }, nodeNames: []string{"node1", "node2", "node3"}, registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"), - expectedNode: "", + want: nil, expectedPods: nil, }, { @@ -1593,7 +1589,7 @@ func TestPreempt(t *testing.T) { }, nodeNames: []string{"node1", "node2", "node3"}, registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"), - expectedNode: "node1", + want: framework.NewPostFilterResultWithNominatedNode("node1"), expectedPods: []string{"p1.1", "p1.2"}, }, } @@ -1691,11 +1687,8 @@ func TestPreempt(t *testing.T) { if !status.IsSuccess() && !status.IsUnschedulable() { t.Errorf("unexpected error in preemption: %v", status.AsError()) } - if res != nil && len(res.NominatedNodeName) != 0 && res.NominatedNodeName != test.expectedNode { - t.Errorf("expected node: %v, got: %v", test.expectedNode, res.NominatedNodeName) - } - if res != nil && len(res.NominatedNodeName) == 0 && len(test.expectedNode) != 0 { - t.Errorf("expected node: %v, got: nothing", test.expectedNode) + if diff := cmp.Diff(test.want, res); diff != "" { + t.Errorf("Unexpected status (-want, +got):\n%s", diff) } if len(deletedPodNames) != len(test.expectedPods) { t.Errorf("expected %v pods, got %v.", len(test.expectedPods), len(deletedPodNames)) @@ -1712,7 +1705,7 @@ func TestPreempt(t *testing.T) { t.Errorf("pod %v is not expected to be a victim.", victimName) } } - if res != nil { + if res != nil && res.NominatingInfo != nil { test.pod.Status.NominatedNodeName = res.NominatedNodeName } @@ -1730,7 +1723,7 @@ func TestPreempt(t *testing.T) { if !status.IsSuccess() && !status.IsUnschedulable() { t.Errorf("unexpected error in preemption: %v", status.AsError()) } - if res != nil && len(deletedPodNames) > 0 { + if res != nil && res.NominatingInfo != nil && len(deletedPodNames) > 0 { t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", res.NominatedNodeName) } }) diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index 2a516bf21b5..5daf229ebb0 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -123,8 +123,19 @@ type Evaluator struct { Interface } +// Preempt returns a PostFilterResult carrying suggested nominatedNodeName, along with a Status. +// The semantics of returned varies on different scenarios: +// - . This denotes it's a transient/rare error that may be self-healed in future cycles. +// - . This status is mostly as expected like the preemptor is waiting for the +// victims to be fully terminated. +// - In both cases above, a nil PostFilterResult is returned to keep the pod's nominatedNodeName unchanged. +// +// - . It indicates the pod cannot be scheduled even with preemption. +// In this case, a non-nil PostFilterResult is returned and result.NominatingMode instructs how to deal with +// the nominatedNodeName. +// - . It's the regular happy path +// and the non-empty nominatedNodeName will be applied to the preemptor pod. func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { - // 0) Fetch the latest version of . // It's safe to directly fetch pod here. Because the informer cache has already been // initialized when creating the Scheduler obj, i.e., factory.go#MakeDefaultErrorFunc(). @@ -158,7 +169,8 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT // Leave FailedPlugins as nil as it won't be used on moving Pods. }, } - return nil, framework.NewStatus(framework.Unschedulable, fitError.Error()) + // Specify nominatedNodeName to clear the pod's nominatedNodeName status, if applicable. + return framework.NewPostFilterResultWithNominatedNode(""), framework.NewStatus(framework.Unschedulable, fitError.Error()) } // 3) Interact with registered Extenders to filter out some candidates if needed. @@ -178,7 +190,7 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT return nil, status } - return &framework.PostFilterResult{NominatedNodeName: bestCandidate.Name()}, framework.NewStatus(framework.Success) + return framework.NewPostFilterResultWithNominatedNode(bestCandidate.Name()), framework.NewStatus(framework.Success) } // FindCandidates calculates a slice of preemption candidates. diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 477f7e9f70c..d0c4c2150cf 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -711,6 +711,8 @@ func (f *frameworkImpl) RunPostFilterPlugins(ctx context.Context, state *framewo }() statuses := make(framework.PluginToStatus) + // `result` records the last meaningful(non-noop) PostFilterResult. + var result *framework.PostFilterResult for _, pl := range f.postFilterPlugins { r, s := f.runPostFilterPlugin(ctx, pl, state, pod, filteredNodeStatusMap) if s.IsSuccess() { @@ -718,11 +720,13 @@ func (f *frameworkImpl) RunPostFilterPlugins(ctx context.Context, state *framewo } else if !s.IsUnschedulable() { // Any status other than Success or Unschedulable is Error. return nil, framework.AsStatus(s.AsError()) + } else if r != nil && r.Mode() != framework.ModeNoop { + result = r } statuses[pl.Name()] = s } - return nil, statuses.Merge() + return result, statuses.Merge() } func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.PostFilterPlugin, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index 755077b2b1c..3ec5e7121d2 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -958,7 +958,7 @@ func TestRunScorePlugins(t *testing.T) { }, { name: "single ScoreWithNormalize plugin", - //registry: registry, + // registry: registry, plugins: buildScoreConfigDefaultWeights(scoreWithNormalizePlugin1), pluginConfigs: []config.PluginConfig{ { @@ -1596,7 +1596,9 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) { podNominator := internalqueue.NewPodNominator(nil) if tt.nominatedPod != nil { - podNominator.AddNominatedPod(framework.NewPodInfo(tt.nominatedPod), nodeName) + podNominator.AddNominatedPod( + framework.NewPodInfo(tt.nominatedPod), + &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: nodeName}) } profile := config.KubeSchedulerProfile{Plugins: cfgPls} f, err := newFrameworkWithQueueSortAndBind(registry, profile, WithPodNominator(podNominator)) diff --git a/pkg/scheduler/generic_scheduler_test.go b/pkg/scheduler/generic_scheduler_test.go index 66b0122fe5b..7da05563919 100644 --- a/pkg/scheduler/generic_scheduler_test.go +++ b/pkg/scheduler/generic_scheduler_test.go @@ -1177,7 +1177,8 @@ func TestFindFitPredicateCallCounts(t *testing.T) { if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil { t.Fatal(err) } - fwk.AddNominatedPod(framework.NewPodInfo(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}), "1") + fwk.AddNominatedPod(framework.NewPodInfo(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}), + &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "1"}) _, _, err = scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), test.pod) diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index fba3e82e225..6200dc801a3 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -297,7 +297,7 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error { klog.ErrorS(nil, "Error: pod is already in the podBackoff queue", "pod", klog.KObj(pod)) } metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc() - p.PodNominator.AddNominatedPod(pInfo.PodInfo, "") + p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil) p.cond.Broadcast() return nil @@ -351,7 +351,7 @@ func (p *PriorityQueue) activate(pod *v1.Pod) bool { p.unschedulableQ.delete(pod) p.podBackoffQ.Delete(pInfo) metrics.SchedulerQueueIncomingPods.WithLabelValues("active", ForceActivate).Inc() - p.PodNominator.AddNominatedPod(pInfo.PodInfo, "") + p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil) return true } @@ -403,7 +403,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodI metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc() } - p.PodNominator.AddNominatedPod(pInfo.PodInfo, "") + p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil) return nil } @@ -546,7 +546,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { if err := p.activeQ.Add(pInfo); err != nil { return err } - p.PodNominator.AddNominatedPod(pInfo.PodInfo, "") + p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil) p.cond.Broadcast() return nil } @@ -692,9 +692,9 @@ func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) { // This is called during the preemption process after a node is nominated to run // the pod. We update the structure before sending a request to update the pod // object to avoid races with the following scheduling cycles. -func (npm *nominator) AddNominatedPod(pi *framework.PodInfo, nodeName string) { +func (npm *nominator) AddNominatedPod(pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) { npm.Lock() - npm.add(pi, nodeName) + npm.add(pi, nominatingInfo) npm.Unlock() } @@ -831,17 +831,19 @@ type nominator struct { sync.RWMutex } -func (npm *nominator) add(pi *framework.PodInfo, nodeName string) { - // always delete the pod if it already exist, to ensure we never store more than +func (npm *nominator) add(pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) { + // Always delete the pod if it already exists, to ensure we never store more than // one instance of the pod. npm.delete(pi.Pod) - nnn := nodeName - if len(nnn) == 0 { - nnn = NominatedNodeName(pi.Pod) - if len(nnn) == 0 { + var nodeName string + if nominatingInfo.Mode() == framework.ModeOverride { + nodeName = nominatingInfo.NominatedNodeName + } else if nominatingInfo.Mode() == framework.ModeNoop { + if pi.Pod.Status.NominatedNodeName == "" { return } + nodeName = pi.Pod.Status.NominatedNodeName } if npm.podLister != nil { @@ -852,14 +854,14 @@ func (npm *nominator) add(pi *framework.PodInfo, nodeName string) { } } - npm.nominatedPodToNode[pi.Pod.UID] = nnn - for _, npi := range npm.nominatedPods[nnn] { + npm.nominatedPodToNode[pi.Pod.UID] = nodeName + for _, npi := range npm.nominatedPods[nodeName] { if npi.Pod.UID == pi.Pod.UID { klog.V(4).InfoS("Pod already exists in the nominator", "pod", klog.KObj(npi.Pod)) return } } - npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], pi) + npm.nominatedPods[nodeName] = append(npm.nominatedPods[nodeName], pi) } func (npm *nominator) delete(p *v1.Pod) { @@ -886,7 +888,7 @@ func (npm *nominator) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.P // In some cases, an Update event with no "NominatedNode" present is received right // after a node("NominatedNode") is reserved for this pod in memory. // In this case, we need to keep reserving the NominatedNode when updating the pod pointer. - nodeName := "" + var nominatingInfo *framework.NominatingInfo // We won't fall into below `if` block if the Update event represents: // (1) NominatedNode info is added // (2) NominatedNode info is updated @@ -894,13 +896,16 @@ func (npm *nominator) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.P if NominatedNodeName(oldPod) == "" && NominatedNodeName(newPodInfo.Pod) == "" { if nnn, ok := npm.nominatedPodToNode[oldPod.UID]; ok { // This is the only case we should continue reserving the NominatedNode - nodeName = nnn + nominatingInfo = &framework.NominatingInfo{ + NominatingMode: framework.ModeOverride, + NominatedNodeName: nnn, + } } } // We update irrespective of the nominatedNodeName changed or not, to ensure // that pod pointer is updated. npm.delete(oldPod) - npm.add(newPodInfo, nodeName) + npm.add(newPodInfo, nominatingInfo) } // NewPodNominator creates a nominator as a backing of framework.PodNominator. diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 3c0ed296687..60a95359477 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -705,7 +705,7 @@ func TestPriorityQueue_NominatedPodDeleted(t *testing.T) { informerFactory.Core().V1().Pods().Informer().GetStore().Delete(tt.podInfo.Pod) } - q.AddNominatedPod(tt.podInfo, tt.podInfo.Pod.Status.NominatedNodeName) + q.AddNominatedPod(tt.podInfo, nil) if got := len(q.NominatedPodsForNode(tt.podInfo.Pod.Status.NominatedNodeName)) == 1; got != tt.want { t.Errorf("Want %v, but got %v", tt.want, got) @@ -746,10 +746,12 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { t.Errorf("add failed: %v", err) } // Update unschedulablePodInfo on a different node than specified in the pod. - q.AddNominatedPod(framework.NewPodInfo(unschedulablePodInfo.Pod), "node5") + q.AddNominatedPod(framework.NewPodInfo(unschedulablePodInfo.Pod), + &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "node5"}) // Update nominated node name of a pod on a node that is not specified in the pod object. - q.AddNominatedPod(framework.NewPodInfo(highPriorityPodInfo.Pod), "node2") + q.AddNominatedPod(framework.NewPodInfo(highPriorityPodInfo.Pod), + &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "node2"}) expectedNominatedPods := &nominator{ nominatedPodToNode: map[types.UID]string{ medPriorityPodInfo.Pod.UID: "node1", @@ -774,7 +776,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { } // Update one of the nominated pods that doesn't have nominatedNodeName in the // pod object. It should be updated correctly. - q.AddNominatedPod(highPriorityPodInfo, "node4") + q.AddNominatedPod(highPriorityPodInfo, &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "node4"}) expectedNominatedPods = &nominator{ nominatedPodToNode: map[types.UID]string{ medPriorityPodInfo.Pod.UID: "node1", diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 98a44fc50e1..84e75ecf84c 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -299,7 +299,7 @@ func (sched *Scheduler) Run(ctx context.Context) { // recordSchedulingFailure records an event for the pod that indicates the // pod has failed to schedule. Also, update the pod condition and nominated node name if set. -func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatedNode string) { +func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo) { sched.Error(podInfo, err) // Update the scheduling queue with the nominated pod information. Without @@ -307,7 +307,7 @@ func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo // and the time the scheduler receives a Pod Update for the nominated pod. // Here we check for nil only for tests. if sched.SchedulingQueue != nil { - sched.SchedulingQueue.AddNominatedPod(podInfo.PodInfo, nominatedNode) + sched.SchedulingQueue.AddNominatedPod(podInfo.PodInfo, nominatingInfo) } pod := podInfo.Pod @@ -318,7 +318,7 @@ func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo Status: v1.ConditionFalse, Reason: reason, Message: err.Error(), - }, nominatedNode); err != nil { + }, nominatingInfo); err != nil { klog.ErrorS(err, "Error updating pod", "pod", klog.KObj(pod)) } } @@ -333,17 +333,17 @@ func truncateMessage(message string) string { return message[:max-len(suffix)] + suffix } -func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatedNode string) error { +func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatingInfo *framework.NominatingInfo) error { klog.V(3).InfoS("Updating pod condition", "pod", klog.KObj(pod), "conditionType", condition.Type, "conditionStatus", condition.Status, "conditionReason", condition.Reason) podStatusCopy := pod.Status.DeepCopy() // NominatedNodeName is updated only if we are trying to set it, and the value is // different from the existing one. - if !podutil.UpdatePodCondition(podStatusCopy, condition) && - (len(nominatedNode) == 0 || pod.Status.NominatedNodeName == nominatedNode) { + nnnNeedsUpdate := nominatingInfo.Mode() == framework.ModeOverride && pod.Status.NominatedNodeName != nominatingInfo.NominatedNodeName + if !podutil.UpdatePodCondition(podStatusCopy, condition) && !nnnNeedsUpdate { return nil } - if nominatedNode != "" { - podStatusCopy.NominatedNodeName = nominatedNode + if nnnNeedsUpdate { + podStatusCopy.NominatedNodeName = nominatingInfo.NominatedNodeName } return util.PatchPodStatus(client, pod, podStatusCopy) } @@ -417,6 +417,10 @@ func (sched *Scheduler) finishBinding(fwk framework.Framework, assumed *v1.Pod, fwk.EventRecorder().Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode) } +var ( + clearNominatedNode = &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: ""} +) + // scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. func (sched *Scheduler) scheduleOne(ctx context.Context) { podInfo := sched.NextPod() @@ -454,7 +458,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // preempt, with the expectation that the next time the pod is tried for scheduling it // will fit due to the preemption. It is also possible that a different pod will schedule // into the resources that were preempted, but this is harmless. - nominatedNode := "" + var nominatingInfo *framework.NominatingInfo if fitError, ok := err.(*framework.FitError); ok { if !fwk.HasPostFilterPlugins() { klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed") @@ -466,8 +470,8 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { } else { klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status) } - if status.IsSuccess() && result != nil { - nominatedNode = result.NominatedNodeName + if result != nil { + nominatingInfo = result.NominatingInfo } } // Pod did not fit anywhere, so it is counted as a failure. If preemption @@ -475,13 +479,15 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // schedule it. (hopefully) metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) } else if err == ErrNoNodesAvailable { + nominatingInfo = clearNominatedNode // No nodes available is counted as unschedulable rather than an error. metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) } else { + nominatingInfo = clearNominatedNode klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod)) metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) } - sched.recordSchedulingFailure(fwk, podInfo, err, v1.PodReasonUnschedulable, nominatedNode) + sched.recordSchedulingFailure(fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo) return } metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) @@ -498,7 +504,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // This relies on the fact that Error will check if the pod has been bound // to a node and if so will not add it back to the unscheduled pods queue // (otherwise this would cause an infinite loop). - sched.recordSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, "") + sched.recordSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, clearNominatedNode) return } @@ -510,7 +516,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") } - sched.recordSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, "") + sched.recordSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode) return } @@ -530,7 +536,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") } - sched.recordSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, "") + sched.recordSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode) return } @@ -573,7 +579,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { return assumedPod.UID != pod.UID }) } - sched.recordSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "") + sched.recordSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode) return } @@ -591,7 +597,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // TODO(#103853): de-duplicate the logic. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil) } - sched.recordSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "") + sched.recordSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode) return } @@ -608,7 +614,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // TODO(#103853): de-duplicate the logic. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil) } - sched.recordSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, "") + sched.recordSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode) } else { // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. if klog.V(2).Enabled() { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index cdeed8a0b53..10e6e8d4e8e 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -1231,7 +1231,7 @@ func TestUpdatePod(t *testing.T) { currentPodConditions []v1.PodCondition newPodCondition *v1.PodCondition currentNominatedNodeName string - newNominatedNodeName string + newNominatingInfo *framework.NominatingInfo expectedPatchRequests int expectedPatchDataPattern string }{ @@ -1361,7 +1361,7 @@ func TestUpdatePod(t *testing.T) { Reason: "currentReason", Message: "currentMessage", }, - newNominatedNodeName: "node1", + newNominatingInfo: &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "node1"}, expectedPatchRequests: 1, expectedPatchDataPattern: `{"status":{"nominatedNodeName":"node1"}}`, }, @@ -1388,7 +1388,7 @@ func TestUpdatePod(t *testing.T) { }, } - if err := updatePod(cs, pod, test.newPodCondition, test.newNominatedNodeName); err != nil { + if err := updatePod(cs, pod, test.newPodCondition, test.newNominatingInfo); err != nil { t.Fatalf("Error calling update: %v", err) } diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index c46aea9dd93..44f040909d5 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -21,6 +21,7 @@ package scheduler import ( "context" "fmt" + "strings" "testing" "time" @@ -46,7 +47,7 @@ import ( "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler" configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" - framework "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" "k8s.io/kubernetes/plugin/pkg/admission/priority" @@ -932,98 +933,211 @@ func TestPreemptionRaces(t *testing.T) { } } -// TestNominatedNodeCleanUp checks that when there are nominated pods on a -// node and a higher priority pod is nominated to run on the node, the nominated -// node name of the lower priority pods is cleared. -// Test scenario: -// 1. Create a few low priority pods with long grade period that fill up a node. -// 2. Create a medium priority pod that preempt some of those pods. -// 3. Check that nominated node name of the medium priority pod is set. -// 4. Create a high priority pod that preempts some pods on that node. -// 5. Check that nominated node name of the high priority pod is set and nominated -// node name of the medium priority pod is cleared. +const ( + alwaysFailPlugin = "alwaysFailPlugin" + doNotFailMe = "do-not-fail-me" +) + +// A fake plugin implements PreBind extension point. +// It always fails with an Unschedulable status, unless the pod contains a `doNotFailMe` string. +type alwaysFail struct{} + +func (af *alwaysFail) Name() string { + return alwaysFailPlugin +} + +func (af *alwaysFail) PreBind(_ context.Context, _ *framework.CycleState, p *v1.Pod, _ string) *framework.Status { + if strings.Contains(p.Name, doNotFailMe) { + return nil + } + return framework.NewStatus(framework.Unschedulable) +} + +func newAlwaysFail(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return &alwaysFail{}, nil +} + +// TestNominatedNodeCleanUp verifies if a pod's nominatedNodeName is set and unset +// properly in different scenarios. func TestNominatedNodeCleanUp(t *testing.T) { - // Initialize scheduler. - testCtx := initTest(t, "preemption") - defer testutils.CleanupTest(t, testCtx) + tests := []struct { + name string + nodeCapacity map[v1.ResourceName]string + // A slice of pods to be created in batch. + podsToCreate [][]*v1.Pod + // Each postCheck function is run after each batch of pods' creation. + postChecks []func(cs clientset.Interface, pod *v1.Pod) error + // Delete the fake node or not. Optional. + deleteNode bool + // Pods to be deleted. Optional. + podNamesToDelete []string - cs := testCtx.ClientSet - - defer cleanupPodsInNamespace(cs, t, testCtx.NS.Name) - - // Create a node with some resources - nodeRes := map[v1.ResourceName]string{ - v1.ResourcePods: "32", - v1.ResourceCPU: "500m", - v1.ResourceMemory: "500", - } - _, err := createNode(testCtx.ClientSet, st.MakeNode().Name("node1").Capacity(nodeRes).Obj()) - if err != nil { - t.Fatalf("Error creating nodes: %v", err) - } - - // Step 1. Create a few low priority pods. - lowPriPods := make([]*v1.Pod, 4) - for i := 0; i < len(lowPriPods); i++ { - lowPriPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(testCtx, fmt.Sprintf("lpod-%v", i), lowPriority, 60)) - if err != nil { - t.Fatalf("Error creating pause pod: %v", err) - } - } - // make sure that the pods are all scheduled. - for _, p := range lowPriPods { - if err := testutils.WaitForPodToSchedule(cs, p); err != nil { - t.Fatalf("Pod %v/%v didn't get scheduled: %v", p.Namespace, p.Name, err) - } - } - // Step 2. Create a medium priority pod. - podConf := initPausePod(&pausePodConfig{ - Name: "medium-priority", - Namespace: testCtx.NS.Name, - Priority: &mediumPriority, - Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI)}, + // Register dummy plugin to simulate particular scheduling failures. Optional. + customPlugins *v1beta3.Plugins + outOfTreeRegistry frameworkruntime.Registry + }{ + { + name: "mid-priority pod preempts low-priority pod, followed by a high-priority pod with another preemption", + nodeCapacity: map[v1.ResourceName]string{v1.ResourceCPU: "5"}, + podsToCreate: [][]*v1.Pod{ + { + st.MakePod().Name("low-1").Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + st.MakePod().Name("low-2").Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + st.MakePod().Name("low-3").Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + st.MakePod().Name("low-4").Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + }, + { + st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(), + }, + { + st.MakePod().Name("high").Priority(highPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "3"}).Obj(), + }, + }, + postChecks: []func(cs clientset.Interface, pod *v1.Pod) error{ + testutils.WaitForPodToSchedule, + waitForNominatedNodeName, + waitForNominatedNodeName, + }, }, - }) - medPriPod, err := createPausePod(cs, podConf) - if err != nil { - t.Errorf("Error while creating the medium priority pod: %v", err) - } - // Step 3. Check if .status.nominatedNodeName of the medium priority pod is set. - if err := waitForNominatedNodeName(cs, medPriPod); err != nil { - t.Errorf(".status.nominatedNodeName was not set for pod %v/%v: %v", medPriPod.Namespace, medPriPod.Name, err) - } - // Step 4. Create a high priority pod. - podConf = initPausePod(&pausePodConfig{ - Name: "high-priority", - Namespace: testCtx.NS.Name, - Priority: &highPriority, - Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, + { + name: "mid-priority pod preempts low-priority pod, followed by a high-priority pod without additional preemption", + nodeCapacity: map[v1.ResourceName]string{v1.ResourceCPU: "2"}, + podsToCreate: [][]*v1.Pod{ + { + st.MakePod().Name("low").Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + }, + { + st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(), + }, + { + st.MakePod().Name("high").Priority(highPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + }, + }, + postChecks: []func(cs clientset.Interface, pod *v1.Pod) error{ + testutils.WaitForPodToSchedule, + waitForNominatedNodeName, + testutils.WaitForPodToSchedule, + }, + podNamesToDelete: []string{"low"}, + }, + { + name: "mid-priority pod preempts low-priority pod, followed by a node deletion", + nodeCapacity: map[v1.ResourceName]string{v1.ResourceCPU: "1"}, + podsToCreate: [][]*v1.Pod{ + { + st.MakePod().Name("low").Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + }, + { + st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + }, + }, + postChecks: []func(cs clientset.Interface, pod *v1.Pod) error{ + testutils.WaitForPodToSchedule, + waitForNominatedNodeName, + }, + // Delete the node to simulate an ErrNoNodesAvailable error. + deleteNode: true, + podNamesToDelete: []string{"low"}, + }, + { + name: "mid-priority pod preempts low-priority pod, but failed the scheduling unexpectedly", + nodeCapacity: map[v1.ResourceName]string{v1.ResourceCPU: "1"}, + podsToCreate: [][]*v1.Pod{ + { + st.MakePod().Name(fmt.Sprintf("low-%v", doNotFailMe)).Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + }, + { + st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + }, + }, + postChecks: []func(cs clientset.Interface, pod *v1.Pod) error{ + testutils.WaitForPodToSchedule, + waitForNominatedNodeName, + }, + podNamesToDelete: []string{fmt.Sprintf("low-%v", doNotFailMe)}, + customPlugins: &v1beta3.Plugins{ + PreBind: v1beta3.PluginSet{ + Enabled: []v1beta3.Plugin{ + {Name: alwaysFailPlugin}, + }, + }, + }, + outOfTreeRegistry: frameworkruntime.Registry{alwaysFailPlugin: newAlwaysFail}, }, - }) - highPriPod, err := createPausePod(cs, podConf) - if err != nil { - t.Errorf("Error while creating the high priority pod: %v", err) } - // Step 5. Check if .status.nominatedNodeName of the high priority pod is set. - if err := waitForNominatedNodeName(cs, highPriPod); err != nil { - t.Errorf(".status.nominatedNodeName was not set for pod %v/%v: %v", highPriPod.Namespace, highPriPod.Name, err) - } - // And .status.nominatedNodeName of the medium priority pod is cleared. - if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { - pod, err := cs.CoreV1().Pods(medPriPod.Namespace).Get(context.TODO(), medPriPod.Name, metav1.GetOptions{}) - if err != nil { - t.Errorf("Error getting the medium priority pod info: %v", err) - } - if len(pod.Status.NominatedNodeName) == 0 { - return true, nil - } - return false, err - }); err != nil { - t.Errorf(".status.nominatedNodeName of the medium priority pod was not cleared: %v", err) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := configtesting.V1beta3ToInternalWithDefaults(t, v1beta3.KubeSchedulerConfiguration{ + Profiles: []v1beta3.KubeSchedulerProfile{{ + SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), + Plugins: tt.customPlugins, + }}, + }) + testCtx := initTest( + t, + "preemption", + scheduler.WithProfiles(cfg.Profiles...), + scheduler.WithFrameworkOutOfTreeRegistry(tt.outOfTreeRegistry), + ) + t.Cleanup(func() { + testutils.CleanupTest(t, testCtx) + }) + + cs, ns := testCtx.ClientSet, testCtx.NS.Name + // Create a node with the specified capacity. + nodeName := "fake-node" + if _, err := createNode(cs, st.MakeNode().Name(nodeName).Capacity(tt.nodeCapacity).Obj()); err != nil { + t.Fatalf("Error creating node %v: %v", nodeName, err) + } + + // Create pods and run post check if necessary. + for i, pods := range tt.podsToCreate { + for _, p := range pods { + p.Namespace = ns + if _, err := createPausePod(cs, p); err != nil { + t.Fatalf("Error creating pod %v: %v", p.Name, err) + } + } + // If necessary, run the post check function. + if len(tt.postChecks) > i && tt.postChecks[i] != nil { + for _, p := range pods { + if err := tt.postChecks[i](cs, p); err != nil { + t.Fatalf("Pod %v didn't pass the postChecks[%v]: %v", p.Name, i, err) + } + } + } + } + + // Delete the node if necessary. + if tt.deleteNode { + if err := cs.CoreV1().Nodes().Delete(context.TODO(), nodeName, *metav1.NewDeleteOptions(0)); err != nil { + t.Fatalf("Node %v cannot be deleted: %v", nodeName, err) + } + } + + // Force deleting the terminating pods if necessary. + // This is required if we demand to delete terminating Pods physically. + for _, podName := range tt.podNamesToDelete { + if err := deletePod(cs, podName, ns); err != nil { + t.Fatalf("Pod %v cannot be deleted: %v", podName, err) + } + } + + // Verify if .status.nominatedNodeName is cleared. + if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + pod, err := cs.CoreV1().Pods(ns).Get(context.TODO(), "medium", metav1.GetOptions{}) + if err != nil { + t.Errorf("Error getting the medium pod: %v", err) + } + if len(pod.Status.NominatedNodeName) == 0 { + return true, nil + } + return false, err + }); err != nil { + t.Errorf(".status.nominatedNodeName of the medium pod was not cleared: %v", err) + } + }) } } @@ -1344,7 +1458,7 @@ func TestPreferNominatedNode(t *testing.T) { nodeNames []string existingPods []*v1.Pod pod *v1.Pod - runnningNode string + runningNode string }{ { name: "nominated node released all resource, preemptor is scheduled to the nominated node", @@ -1365,7 +1479,7 @@ func TestPreferNominatedNode(t *testing.T) { v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, }, }), - runnningNode: "node-1", + runningNode: "node-1", }, { name: "nominated node cannot pass all the filters, preemptor should find a different node", @@ -1386,7 +1500,7 @@ func TestPreferNominatedNode(t *testing.T) { v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, }, }), - runnningNode: "node-2", + runningNode: "node-2", }, } @@ -1435,8 +1549,8 @@ func TestPreferNominatedNode(t *testing.T) { t.Errorf("Cannot schedule Pod %v/%v, error: %v", test.pod.Namespace, test.pod.Name, err) } // Make sure the pod has been scheduled to the right node. - if preemptor.Spec.NodeName != test.runnningNode { - t.Errorf("Expect pod running on %v, got %v.", test.runnningNode, preemptor.Spec.NodeName) + if preemptor.Spec.NodeName != test.runningNode { + t.Errorf("Expect pod running on %v, got %v.", test.runningNode, preemptor.Spec.NodeName) } }) } diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 932c1f04e08..69dde6e0894 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -462,35 +462,6 @@ func getPod(cs clientset.Interface, podName string, podNamespace string) (*v1.Po return cs.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) } -// noPodsInNamespace returns true if no pods in the given namespace. -func noPodsInNamespace(c clientset.Interface, podNamespace string) wait.ConditionFunc { - return func() (bool, error) { - pods, err := c.CoreV1().Pods(podNamespace).List(context.TODO(), metav1.ListOptions{}) - if err != nil { - return false, err - } - - return len(pods.Items) == 0, nil - } -} - -// cleanupPodsInNamespace deletes the pods in the given namespace and waits for them to -// be actually deleted. They are removed with no grace. -func cleanupPodsInNamespace(cs clientset.Interface, t *testing.T, ns string) { - t.Helper() - - zero := int64(0) - if err := cs.CoreV1().Pods(ns).DeleteCollection(context.TODO(), metav1.DeleteOptions{GracePeriodSeconds: &zero}, metav1.ListOptions{}); err != nil { - t.Errorf("error while listing pod in namespace %v: %v", ns, err) - return - } - - if err := wait.Poll(time.Second, wait.ForeverTestTimeout, - noPodsInNamespace(cs, ns)); err != nil { - t.Errorf("error while waiting for pods in namespace %v: %v", ns, err) - } -} - // podScheduled returns true if a node is assigned to the given pod. func podScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { return func() (bool, error) {