Merge pull request #106816 from Huang-Wei/fix-nnn-not-cleared

clear pod's .status.nominatedNodeName when necessary
This commit is contained in:
Kubernetes Prow Robot 2021-12-16 19:36:28 -08:00 committed by GitHub
commit 712745cb67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 343 additions and 205 deletions

View File

@ -608,16 +608,44 @@ type Handle interface {
Parallelizer() parallelize.Parallelizer
}
type NominatingMode int
const (
ModeNoop NominatingMode = iota
ModeOverride
)
type NominatingInfo struct {
NominatedNodeName string
NominatingMode NominatingMode
}
// PostFilterResult wraps needed info for scheduler framework to act upon PostFilter phase.
type PostFilterResult struct {
NominatedNodeName string
*NominatingInfo
}
func NewPostFilterResultWithNominatedNode(name string) *PostFilterResult {
return &PostFilterResult{
NominatingInfo: &NominatingInfo{
NominatedNodeName: name,
NominatingMode: ModeOverride,
},
}
}
func (ni *NominatingInfo) Mode() NominatingMode {
if ni == nil {
return ModeNoop
}
return ni.NominatingMode
}
// PodNominator abstracts operations to maintain nominated Pods.
type PodNominator interface {
// AddNominatedPod adds the given pod to the nominator or
// updates it if it already exists.
AddNominatedPod(pod *PodInfo, nodeName string)
AddNominatedPod(pod *PodInfo, nominatingInfo *NominatingInfo)
// DeleteNominatedPodIfExists deletes nominatedPod from internal cache. It's a no-op if it doesn't exist.
DeleteNominatedPodIfExists(pod *v1.Pod)
// UpdateNominatedPod updates the <oldPod> with <newPod>.

View File

@ -161,7 +161,7 @@ func TestPostFilter(t *testing.T) {
filteredNodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.Unschedulable),
},
wantResult: &framework.PostFilterResult{NominatedNodeName: "node1"},
wantResult: framework.NewPostFilterResultWithNominatedNode("node1"),
wantStatus: framework.NewStatus(framework.Success),
},
{
@ -176,7 +176,7 @@ func TestPostFilter(t *testing.T) {
filteredNodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.Unschedulable),
},
wantResult: nil,
wantResult: framework.NewPostFilterResultWithNominatedNode(""),
wantStatus: framework.NewStatus(framework.Unschedulable, "0/1 nodes are available: 1 No victims found on node node1 for preemptor pod p."),
},
{
@ -191,7 +191,7 @@ func TestPostFilter(t *testing.T) {
filteredNodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable),
},
wantResult: nil,
wantResult: framework.NewPostFilterResultWithNominatedNode(""),
wantStatus: framework.NewStatus(framework.Unschedulable, "0/1 nodes are available: 1 Preemption is not helpful for scheduling."),
},
{
@ -209,7 +209,7 @@ func TestPostFilter(t *testing.T) {
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
wantResult: &framework.PostFilterResult{NominatedNodeName: "node2"},
wantResult: framework.NewPostFilterResultWithNominatedNode("node2"),
wantStatus: framework.NewStatus(framework.Success),
},
{
@ -227,10 +227,8 @@ func TestPostFilter(t *testing.T) {
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
extender: &st.FakeExtender{Predicates: []st.FitPredicate{st.Node1PredicateExtender}},
wantResult: &framework.PostFilterResult{
NominatedNodeName: "node1",
},
extender: &st.FakeExtender{Predicates: []st.FitPredicate{st.Node1PredicateExtender}},
wantResult: framework.NewPostFilterResultWithNominatedNode("node1"),
wantStatus: framework.NewStatus(framework.Success),
},
{
@ -248,7 +246,7 @@ func TestPostFilter(t *testing.T) {
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
wantResult: nil,
wantResult: framework.NewPostFilterResultWithNominatedNode(""),
wantStatus: framework.NewStatus(framework.Unschedulable, "0/2 nodes are available: 2 Insufficient cpu."),
},
{
@ -266,7 +264,7 @@ func TestPostFilter(t *testing.T) {
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
wantResult: nil,
wantResult: framework.NewPostFilterResultWithNominatedNode(""),
wantStatus: framework.NewStatus(framework.Unschedulable, "0/2 nodes are available: 1 Insufficient cpu, 1 No victims found on node node1 for preemptor pod p."),
},
{
@ -286,7 +284,7 @@ func TestPostFilter(t *testing.T) {
"node3": framework.NewStatus(framework.UnschedulableAndUnresolvable),
"node4": framework.NewStatus(framework.UnschedulableAndUnresolvable),
},
wantResult: nil,
wantResult: framework.NewPostFilterResultWithNominatedNode(""),
wantStatus: framework.NewStatus(framework.Unschedulable, "0/4 nodes are available: 2 Insufficient cpu, 2 Preemption is not helpful for scheduling."),
},
{
@ -317,9 +315,7 @@ func TestPostFilter(t *testing.T) {
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
wantResult: &framework.PostFilterResult{
NominatedNodeName: "node2",
},
wantResult: framework.NewPostFilterResultWithNominatedNode("node2"),
wantStatus: framework.NewStatus(framework.Success),
},
}
@ -1465,7 +1461,7 @@ func TestPreempt(t *testing.T) {
extenders []*st.FakeExtender
nodeNames []string
registerPlugin st.RegisterPluginFunc
expectedNode string
want *framework.PostFilterResult
expectedPods []string // list of preempted pods
}{
{
@ -1479,7 +1475,7 @@ func TestPreempt(t *testing.T) {
},
nodeNames: []string{"node1", "node2", "node3"},
registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"),
expectedNode: "node1",
want: framework.NewPostFilterResultWithNominatedNode("node1"),
expectedPods: []string{"p1.1", "p1.2"},
},
{
@ -1497,7 +1493,7 @@ func TestPreempt(t *testing.T) {
},
nodeNames: []string{"node-a/zone1", "node-b/zone1", "node-x/zone2"},
registerPlugin: st.RegisterPluginAsExtensions(podtopologyspread.Name, podtopologyspread.New, "PreFilter", "Filter"),
expectedNode: "node-b",
want: framework.NewPostFilterResultWithNominatedNode("node-b"),
expectedPods: []string{"p-b1"},
},
{
@ -1514,7 +1510,7 @@ func TestPreempt(t *testing.T) {
{Predicates: []st.FitPredicate{st.Node1PredicateExtender}},
},
registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"),
expectedNode: "node1",
want: framework.NewPostFilterResultWithNominatedNode("node1"),
expectedPods: []string{"p1.1", "p1.2"},
},
{
@ -1530,7 +1526,7 @@ func TestPreempt(t *testing.T) {
{Predicates: []st.FitPredicate{st.FalsePredicateExtender}},
},
registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"),
expectedNode: "",
want: nil,
expectedPods: []string{},
},
{
@ -1547,7 +1543,7 @@ func TestPreempt(t *testing.T) {
{Predicates: []st.FitPredicate{st.Node1PredicateExtender}},
},
registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"),
expectedNode: "node1",
want: framework.NewPostFilterResultWithNominatedNode("node1"),
expectedPods: []string{"p1.1", "p1.2"},
},
{
@ -1564,8 +1560,8 @@ func TestPreempt(t *testing.T) {
{Predicates: []st.FitPredicate{st.TruePredicateExtender}},
},
registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"),
//sum of priorities of all victims on node1 is larger than node2, node2 is chosen.
expectedNode: "node2",
// sum of priorities of all victims on node1 is larger than node2, node2 is chosen.
want: framework.NewPostFilterResultWithNominatedNode("node2"),
expectedPods: []string{"p2.1"},
},
{
@ -1579,7 +1575,7 @@ func TestPreempt(t *testing.T) {
},
nodeNames: []string{"node1", "node2", "node3"},
registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"),
expectedNode: "",
want: nil,
expectedPods: nil,
},
{
@ -1593,7 +1589,7 @@ func TestPreempt(t *testing.T) {
},
nodeNames: []string{"node1", "node2", "node3"},
registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"),
expectedNode: "node1",
want: framework.NewPostFilterResultWithNominatedNode("node1"),
expectedPods: []string{"p1.1", "p1.2"},
},
}
@ -1691,11 +1687,8 @@ func TestPreempt(t *testing.T) {
if !status.IsSuccess() && !status.IsUnschedulable() {
t.Errorf("unexpected error in preemption: %v", status.AsError())
}
if res != nil && len(res.NominatedNodeName) != 0 && res.NominatedNodeName != test.expectedNode {
t.Errorf("expected node: %v, got: %v", test.expectedNode, res.NominatedNodeName)
}
if res != nil && len(res.NominatedNodeName) == 0 && len(test.expectedNode) != 0 {
t.Errorf("expected node: %v, got: nothing", test.expectedNode)
if diff := cmp.Diff(test.want, res); diff != "" {
t.Errorf("Unexpected status (-want, +got):\n%s", diff)
}
if len(deletedPodNames) != len(test.expectedPods) {
t.Errorf("expected %v pods, got %v.", len(test.expectedPods), len(deletedPodNames))
@ -1712,7 +1705,7 @@ func TestPreempt(t *testing.T) {
t.Errorf("pod %v is not expected to be a victim.", victimName)
}
}
if res != nil {
if res != nil && res.NominatingInfo != nil {
test.pod.Status.NominatedNodeName = res.NominatedNodeName
}
@ -1730,7 +1723,7 @@ func TestPreempt(t *testing.T) {
if !status.IsSuccess() && !status.IsUnschedulable() {
t.Errorf("unexpected error in preemption: %v", status.AsError())
}
if res != nil && len(deletedPodNames) > 0 {
if res != nil && res.NominatingInfo != nil && len(deletedPodNames) > 0 {
t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", res.NominatedNodeName)
}
})

View File

@ -123,8 +123,19 @@ type Evaluator struct {
Interface
}
// Preempt returns a PostFilterResult carrying suggested nominatedNodeName, along with a Status.
// The semantics of returned <PostFilterResult, Status> varies on different scenarios:
// - <nil, Error>. This denotes it's a transient/rare error that may be self-healed in future cycles.
// - <nil, Unschedulable>. This status is mostly as expected like the preemptor is waiting for the
// victims to be fully terminated.
// - In both cases above, a nil PostFilterResult is returned to keep the pod's nominatedNodeName unchanged.
//
// - <non-nil PostFilterResult, Unschedulable>. It indicates the pod cannot be scheduled even with preemption.
// In this case, a non-nil PostFilterResult is returned and result.NominatingMode instructs how to deal with
// the nominatedNodeName.
// - <non-nil PostFilterResult}, Success>. It's the regular happy path
// and the non-empty nominatedNodeName will be applied to the preemptor pod.
func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
// 0) Fetch the latest version of <pod>.
// It's safe to directly fetch pod here. Because the informer cache has already been
// initialized when creating the Scheduler obj, i.e., factory.go#MakeDefaultErrorFunc().
@ -158,7 +169,8 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
// Leave FailedPlugins as nil as it won't be used on moving Pods.
},
}
return nil, framework.NewStatus(framework.Unschedulable, fitError.Error())
// Specify nominatedNodeName to clear the pod's nominatedNodeName status, if applicable.
return framework.NewPostFilterResultWithNominatedNode(""), framework.NewStatus(framework.Unschedulable, fitError.Error())
}
// 3) Interact with registered Extenders to filter out some candidates if needed.
@ -178,7 +190,7 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
return nil, status
}
return &framework.PostFilterResult{NominatedNodeName: bestCandidate.Name()}, framework.NewStatus(framework.Success)
return framework.NewPostFilterResultWithNominatedNode(bestCandidate.Name()), framework.NewStatus(framework.Success)
}
// FindCandidates calculates a slice of preemption candidates.

View File

@ -711,6 +711,8 @@ func (f *frameworkImpl) RunPostFilterPlugins(ctx context.Context, state *framewo
}()
statuses := make(framework.PluginToStatus)
// `result` records the last meaningful(non-noop) PostFilterResult.
var result *framework.PostFilterResult
for _, pl := range f.postFilterPlugins {
r, s := f.runPostFilterPlugin(ctx, pl, state, pod, filteredNodeStatusMap)
if s.IsSuccess() {
@ -718,11 +720,13 @@ func (f *frameworkImpl) RunPostFilterPlugins(ctx context.Context, state *framewo
} else if !s.IsUnschedulable() {
// Any status other than Success or Unschedulable is Error.
return nil, framework.AsStatus(s.AsError())
} else if r != nil && r.Mode() != framework.ModeNoop {
result = r
}
statuses[pl.Name()] = s
}
return nil, statuses.Merge()
return result, statuses.Merge()
}
func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.PostFilterPlugin, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {

View File

@ -958,7 +958,7 @@ func TestRunScorePlugins(t *testing.T) {
},
{
name: "single ScoreWithNormalize plugin",
//registry: registry,
// registry: registry,
plugins: buildScoreConfigDefaultWeights(scoreWithNormalizePlugin1),
pluginConfigs: []config.PluginConfig{
{
@ -1596,7 +1596,9 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) {
podNominator := internalqueue.NewPodNominator(nil)
if tt.nominatedPod != nil {
podNominator.AddNominatedPod(framework.NewPodInfo(tt.nominatedPod), nodeName)
podNominator.AddNominatedPod(
framework.NewPodInfo(tt.nominatedPod),
&framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: nodeName})
}
profile := config.KubeSchedulerProfile{Plugins: cfgPls}
f, err := newFrameworkWithQueueSortAndBind(registry, profile, WithPodNominator(podNominator))

View File

@ -1177,7 +1177,8 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil {
t.Fatal(err)
}
fwk.AddNominatedPod(framework.NewPodInfo(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}), "1")
fwk.AddNominatedPod(framework.NewPodInfo(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}),
&framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "1"})
_, _, err = scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), test.pod)

View File

@ -297,7 +297,7 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
klog.ErrorS(nil, "Error: pod is already in the podBackoff queue", "pod", klog.KObj(pod))
}
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
p.PodNominator.AddNominatedPod(pInfo.PodInfo, "")
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
p.cond.Broadcast()
return nil
@ -351,7 +351,7 @@ func (p *PriorityQueue) activate(pod *v1.Pod) bool {
p.unschedulableQ.delete(pod)
p.podBackoffQ.Delete(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", ForceActivate).Inc()
p.PodNominator.AddNominatedPod(pInfo.PodInfo, "")
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
return true
}
@ -403,7 +403,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodI
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
}
p.PodNominator.AddNominatedPod(pInfo.PodInfo, "")
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
return nil
}
@ -546,7 +546,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
if err := p.activeQ.Add(pInfo); err != nil {
return err
}
p.PodNominator.AddNominatedPod(pInfo.PodInfo, "")
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
p.cond.Broadcast()
return nil
}
@ -692,9 +692,9 @@ func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) {
// This is called during the preemption process after a node is nominated to run
// the pod. We update the structure before sending a request to update the pod
// object to avoid races with the following scheduling cycles.
func (npm *nominator) AddNominatedPod(pi *framework.PodInfo, nodeName string) {
func (npm *nominator) AddNominatedPod(pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
npm.Lock()
npm.add(pi, nodeName)
npm.add(pi, nominatingInfo)
npm.Unlock()
}
@ -831,17 +831,19 @@ type nominator struct {
sync.RWMutex
}
func (npm *nominator) add(pi *framework.PodInfo, nodeName string) {
// always delete the pod if it already exist, to ensure we never store more than
func (npm *nominator) add(pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
// Always delete the pod if it already exists, to ensure we never store more than
// one instance of the pod.
npm.delete(pi.Pod)
nnn := nodeName
if len(nnn) == 0 {
nnn = NominatedNodeName(pi.Pod)
if len(nnn) == 0 {
var nodeName string
if nominatingInfo.Mode() == framework.ModeOverride {
nodeName = nominatingInfo.NominatedNodeName
} else if nominatingInfo.Mode() == framework.ModeNoop {
if pi.Pod.Status.NominatedNodeName == "" {
return
}
nodeName = pi.Pod.Status.NominatedNodeName
}
if npm.podLister != nil {
@ -852,14 +854,14 @@ func (npm *nominator) add(pi *framework.PodInfo, nodeName string) {
}
}
npm.nominatedPodToNode[pi.Pod.UID] = nnn
for _, npi := range npm.nominatedPods[nnn] {
npm.nominatedPodToNode[pi.Pod.UID] = nodeName
for _, npi := range npm.nominatedPods[nodeName] {
if npi.Pod.UID == pi.Pod.UID {
klog.V(4).InfoS("Pod already exists in the nominator", "pod", klog.KObj(npi.Pod))
return
}
}
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], pi)
npm.nominatedPods[nodeName] = append(npm.nominatedPods[nodeName], pi)
}
func (npm *nominator) delete(p *v1.Pod) {
@ -886,7 +888,7 @@ func (npm *nominator) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.P
// In some cases, an Update event with no "NominatedNode" present is received right
// after a node("NominatedNode") is reserved for this pod in memory.
// In this case, we need to keep reserving the NominatedNode when updating the pod pointer.
nodeName := ""
var nominatingInfo *framework.NominatingInfo
// We won't fall into below `if` block if the Update event represents:
// (1) NominatedNode info is added
// (2) NominatedNode info is updated
@ -894,13 +896,16 @@ func (npm *nominator) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.P
if NominatedNodeName(oldPod) == "" && NominatedNodeName(newPodInfo.Pod) == "" {
if nnn, ok := npm.nominatedPodToNode[oldPod.UID]; ok {
// This is the only case we should continue reserving the NominatedNode
nodeName = nnn
nominatingInfo = &framework.NominatingInfo{
NominatingMode: framework.ModeOverride,
NominatedNodeName: nnn,
}
}
}
// We update irrespective of the nominatedNodeName changed or not, to ensure
// that pod pointer is updated.
npm.delete(oldPod)
npm.add(newPodInfo, nodeName)
npm.add(newPodInfo, nominatingInfo)
}
// NewPodNominator creates a nominator as a backing of framework.PodNominator.

View File

@ -705,7 +705,7 @@ func TestPriorityQueue_NominatedPodDeleted(t *testing.T) {
informerFactory.Core().V1().Pods().Informer().GetStore().Delete(tt.podInfo.Pod)
}
q.AddNominatedPod(tt.podInfo, tt.podInfo.Pod.Status.NominatedNodeName)
q.AddNominatedPod(tt.podInfo, nil)
if got := len(q.NominatedPodsForNode(tt.podInfo.Pod.Status.NominatedNodeName)) == 1; got != tt.want {
t.Errorf("Want %v, but got %v", tt.want, got)
@ -746,10 +746,12 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
t.Errorf("add failed: %v", err)
}
// Update unschedulablePodInfo on a different node than specified in the pod.
q.AddNominatedPod(framework.NewPodInfo(unschedulablePodInfo.Pod), "node5")
q.AddNominatedPod(framework.NewPodInfo(unschedulablePodInfo.Pod),
&framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "node5"})
// Update nominated node name of a pod on a node that is not specified in the pod object.
q.AddNominatedPod(framework.NewPodInfo(highPriorityPodInfo.Pod), "node2")
q.AddNominatedPod(framework.NewPodInfo(highPriorityPodInfo.Pod),
&framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "node2"})
expectedNominatedPods := &nominator{
nominatedPodToNode: map[types.UID]string{
medPriorityPodInfo.Pod.UID: "node1",
@ -774,7 +776,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
}
// Update one of the nominated pods that doesn't have nominatedNodeName in the
// pod object. It should be updated correctly.
q.AddNominatedPod(highPriorityPodInfo, "node4")
q.AddNominatedPod(highPriorityPodInfo, &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "node4"})
expectedNominatedPods = &nominator{
nominatedPodToNode: map[types.UID]string{
medPriorityPodInfo.Pod.UID: "node1",

View File

@ -299,7 +299,7 @@ func (sched *Scheduler) Run(ctx context.Context) {
// recordSchedulingFailure records an event for the pod that indicates the
// pod has failed to schedule. Also, update the pod condition and nominated node name if set.
func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatedNode string) {
func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo) {
sched.Error(podInfo, err)
// Update the scheduling queue with the nominated pod information. Without
@ -307,7 +307,7 @@ func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo
// and the time the scheduler receives a Pod Update for the nominated pod.
// Here we check for nil only for tests.
if sched.SchedulingQueue != nil {
sched.SchedulingQueue.AddNominatedPod(podInfo.PodInfo, nominatedNode)
sched.SchedulingQueue.AddNominatedPod(podInfo.PodInfo, nominatingInfo)
}
pod := podInfo.Pod
@ -318,7 +318,7 @@ func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo
Status: v1.ConditionFalse,
Reason: reason,
Message: err.Error(),
}, nominatedNode); err != nil {
}, nominatingInfo); err != nil {
klog.ErrorS(err, "Error updating pod", "pod", klog.KObj(pod))
}
}
@ -333,17 +333,17 @@ func truncateMessage(message string) string {
return message[:max-len(suffix)] + suffix
}
func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatedNode string) error {
func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatingInfo *framework.NominatingInfo) error {
klog.V(3).InfoS("Updating pod condition", "pod", klog.KObj(pod), "conditionType", condition.Type, "conditionStatus", condition.Status, "conditionReason", condition.Reason)
podStatusCopy := pod.Status.DeepCopy()
// NominatedNodeName is updated only if we are trying to set it, and the value is
// different from the existing one.
if !podutil.UpdatePodCondition(podStatusCopy, condition) &&
(len(nominatedNode) == 0 || pod.Status.NominatedNodeName == nominatedNode) {
nnnNeedsUpdate := nominatingInfo.Mode() == framework.ModeOverride && pod.Status.NominatedNodeName != nominatingInfo.NominatedNodeName
if !podutil.UpdatePodCondition(podStatusCopy, condition) && !nnnNeedsUpdate {
return nil
}
if nominatedNode != "" {
podStatusCopy.NominatedNodeName = nominatedNode
if nnnNeedsUpdate {
podStatusCopy.NominatedNodeName = nominatingInfo.NominatedNodeName
}
return util.PatchPodStatus(client, pod, podStatusCopy)
}
@ -417,6 +417,10 @@ func (sched *Scheduler) finishBinding(fwk framework.Framework, assumed *v1.Pod,
fwk.EventRecorder().Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
}
var (
clearNominatedNode = &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: ""}
)
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne(ctx context.Context) {
podInfo := sched.NextPod()
@ -454,7 +458,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// preempt, with the expectation that the next time the pod is tried for scheduling it
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.
nominatedNode := ""
var nominatingInfo *framework.NominatingInfo
if fitError, ok := err.(*framework.FitError); ok {
if !fwk.HasPostFilterPlugins() {
klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
@ -466,8 +470,8 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
} else {
klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
}
if status.IsSuccess() && result != nil {
nominatedNode = result.NominatedNodeName
if result != nil {
nominatingInfo = result.NominatingInfo
}
}
// Pod did not fit anywhere, so it is counted as a failure. If preemption
@ -475,13 +479,15 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// schedule it. (hopefully)
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
} else if err == ErrNoNodesAvailable {
nominatingInfo = clearNominatedNode
// No nodes available is counted as unschedulable rather than an error.
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
} else {
nominatingInfo = clearNominatedNode
klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
}
sched.recordSchedulingFailure(fwk, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)
sched.recordSchedulingFailure(fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo)
return
}
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
@ -498,7 +504,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// This relies on the fact that Error will check if the pod has been bound
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
sched.recordSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, "")
sched.recordSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, clearNominatedNode)
return
}
@ -510,7 +516,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, "")
sched.recordSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode)
return
}
@ -530,7 +536,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, "")
sched.recordSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode)
return
}
@ -573,7 +579,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
return assumedPod.UID != pod.UID
})
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "")
sched.recordSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode)
return
}
@ -591,7 +597,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// TODO(#103853): de-duplicate the logic.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "")
sched.recordSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode)
return
}
@ -608,7 +614,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// TODO(#103853): de-duplicate the logic.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, "")
sched.recordSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode)
} else {
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
if klog.V(2).Enabled() {

View File

@ -1231,7 +1231,7 @@ func TestUpdatePod(t *testing.T) {
currentPodConditions []v1.PodCondition
newPodCondition *v1.PodCondition
currentNominatedNodeName string
newNominatedNodeName string
newNominatingInfo *framework.NominatingInfo
expectedPatchRequests int
expectedPatchDataPattern string
}{
@ -1361,7 +1361,7 @@ func TestUpdatePod(t *testing.T) {
Reason: "currentReason",
Message: "currentMessage",
},
newNominatedNodeName: "node1",
newNominatingInfo: &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "node1"},
expectedPatchRequests: 1,
expectedPatchDataPattern: `{"status":{"nominatedNodeName":"node1"}}`,
},
@ -1388,7 +1388,7 @@ func TestUpdatePod(t *testing.T) {
},
}
if err := updatePod(cs, pod, test.newPodCondition, test.newNominatedNodeName); err != nil {
if err := updatePod(cs, pod, test.newPodCondition, test.newNominatingInfo); err != nil {
t.Fatalf("Error calling update: %v", err)
}

View File

@ -21,6 +21,7 @@ package scheduler
import (
"context"
"fmt"
"strings"
"testing"
"time"
@ -46,7 +47,7 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
framework "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/plugin/pkg/admission/priority"
@ -932,98 +933,211 @@ func TestPreemptionRaces(t *testing.T) {
}
}
// TestNominatedNodeCleanUp checks that when there are nominated pods on a
// node and a higher priority pod is nominated to run on the node, the nominated
// node name of the lower priority pods is cleared.
// Test scenario:
// 1. Create a few low priority pods with long grade period that fill up a node.
// 2. Create a medium priority pod that preempt some of those pods.
// 3. Check that nominated node name of the medium priority pod is set.
// 4. Create a high priority pod that preempts some pods on that node.
// 5. Check that nominated node name of the high priority pod is set and nominated
// node name of the medium priority pod is cleared.
const (
alwaysFailPlugin = "alwaysFailPlugin"
doNotFailMe = "do-not-fail-me"
)
// A fake plugin implements PreBind extension point.
// It always fails with an Unschedulable status, unless the pod contains a `doNotFailMe` string.
type alwaysFail struct{}
func (af *alwaysFail) Name() string {
return alwaysFailPlugin
}
func (af *alwaysFail) PreBind(_ context.Context, _ *framework.CycleState, p *v1.Pod, _ string) *framework.Status {
if strings.Contains(p.Name, doNotFailMe) {
return nil
}
return framework.NewStatus(framework.Unschedulable)
}
func newAlwaysFail(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
return &alwaysFail{}, nil
}
// TestNominatedNodeCleanUp verifies if a pod's nominatedNodeName is set and unset
// properly in different scenarios.
func TestNominatedNodeCleanUp(t *testing.T) {
// Initialize scheduler.
testCtx := initTest(t, "preemption")
defer testutils.CleanupTest(t, testCtx)
tests := []struct {
name string
nodeCapacity map[v1.ResourceName]string
// A slice of pods to be created in batch.
podsToCreate [][]*v1.Pod
// Each postCheck function is run after each batch of pods' creation.
postChecks []func(cs clientset.Interface, pod *v1.Pod) error
// Delete the fake node or not. Optional.
deleteNode bool
// Pods to be deleted. Optional.
podNamesToDelete []string
cs := testCtx.ClientSet
defer cleanupPodsInNamespace(cs, t, testCtx.NS.Name)
// Create a node with some resources
nodeRes := map[v1.ResourceName]string{
v1.ResourcePods: "32",
v1.ResourceCPU: "500m",
v1.ResourceMemory: "500",
}
_, err := createNode(testCtx.ClientSet, st.MakeNode().Name("node1").Capacity(nodeRes).Obj())
if err != nil {
t.Fatalf("Error creating nodes: %v", err)
}
// Step 1. Create a few low priority pods.
lowPriPods := make([]*v1.Pod, 4)
for i := 0; i < len(lowPriPods); i++ {
lowPriPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(testCtx, fmt.Sprintf("lpod-%v", i), lowPriority, 60))
if err != nil {
t.Fatalf("Error creating pause pod: %v", err)
}
}
// make sure that the pods are all scheduled.
for _, p := range lowPriPods {
if err := testutils.WaitForPodToSchedule(cs, p); err != nil {
t.Fatalf("Pod %v/%v didn't get scheduled: %v", p.Namespace, p.Name, err)
}
}
// Step 2. Create a medium priority pod.
podConf := initPausePod(&pausePodConfig{
Name: "medium-priority",
Namespace: testCtx.NS.Name,
Priority: &mediumPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI)},
// Register dummy plugin to simulate particular scheduling failures. Optional.
customPlugins *v1beta3.Plugins
outOfTreeRegistry frameworkruntime.Registry
}{
{
name: "mid-priority pod preempts low-priority pod, followed by a high-priority pod with another preemption",
nodeCapacity: map[v1.ResourceName]string{v1.ResourceCPU: "5"},
podsToCreate: [][]*v1.Pod{
{
st.MakePod().Name("low-1").Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
st.MakePod().Name("low-2").Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
st.MakePod().Name("low-3").Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
st.MakePod().Name("low-4").Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
},
{
st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(),
},
{
st.MakePod().Name("high").Priority(highPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "3"}).Obj(),
},
},
postChecks: []func(cs clientset.Interface, pod *v1.Pod) error{
testutils.WaitForPodToSchedule,
waitForNominatedNodeName,
waitForNominatedNodeName,
},
},
})
medPriPod, err := createPausePod(cs, podConf)
if err != nil {
t.Errorf("Error while creating the medium priority pod: %v", err)
}
// Step 3. Check if .status.nominatedNodeName of the medium priority pod is set.
if err := waitForNominatedNodeName(cs, medPriPod); err != nil {
t.Errorf(".status.nominatedNodeName was not set for pod %v/%v: %v", medPriPod.Namespace, medPriPod.Name, err)
}
// Step 4. Create a high priority pod.
podConf = initPausePod(&pausePodConfig{
Name: "high-priority",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
{
name: "mid-priority pod preempts low-priority pod, followed by a high-priority pod without additional preemption",
nodeCapacity: map[v1.ResourceName]string{v1.ResourceCPU: "2"},
podsToCreate: [][]*v1.Pod{
{
st.MakePod().Name("low").Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
},
{
st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(),
},
{
st.MakePod().Name("high").Priority(highPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
},
},
postChecks: []func(cs clientset.Interface, pod *v1.Pod) error{
testutils.WaitForPodToSchedule,
waitForNominatedNodeName,
testutils.WaitForPodToSchedule,
},
podNamesToDelete: []string{"low"},
},
{
name: "mid-priority pod preempts low-priority pod, followed by a node deletion",
nodeCapacity: map[v1.ResourceName]string{v1.ResourceCPU: "1"},
podsToCreate: [][]*v1.Pod{
{
st.MakePod().Name("low").Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
},
{
st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
},
},
postChecks: []func(cs clientset.Interface, pod *v1.Pod) error{
testutils.WaitForPodToSchedule,
waitForNominatedNodeName,
},
// Delete the node to simulate an ErrNoNodesAvailable error.
deleteNode: true,
podNamesToDelete: []string{"low"},
},
{
name: "mid-priority pod preempts low-priority pod, but failed the scheduling unexpectedly",
nodeCapacity: map[v1.ResourceName]string{v1.ResourceCPU: "1"},
podsToCreate: [][]*v1.Pod{
{
st.MakePod().Name(fmt.Sprintf("low-%v", doNotFailMe)).Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
},
{
st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
},
},
postChecks: []func(cs clientset.Interface, pod *v1.Pod) error{
testutils.WaitForPodToSchedule,
waitForNominatedNodeName,
},
podNamesToDelete: []string{fmt.Sprintf("low-%v", doNotFailMe)},
customPlugins: &v1beta3.Plugins{
PreBind: v1beta3.PluginSet{
Enabled: []v1beta3.Plugin{
{Name: alwaysFailPlugin},
},
},
},
outOfTreeRegistry: frameworkruntime.Registry{alwaysFailPlugin: newAlwaysFail},
},
})
highPriPod, err := createPausePod(cs, podConf)
if err != nil {
t.Errorf("Error while creating the high priority pod: %v", err)
}
// Step 5. Check if .status.nominatedNodeName of the high priority pod is set.
if err := waitForNominatedNodeName(cs, highPriPod); err != nil {
t.Errorf(".status.nominatedNodeName was not set for pod %v/%v: %v", highPriPod.Namespace, highPriPod.Name, err)
}
// And .status.nominatedNodeName of the medium priority pod is cleared.
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
pod, err := cs.CoreV1().Pods(medPriPod.Namespace).Get(context.TODO(), medPriPod.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("Error getting the medium priority pod info: %v", err)
}
if len(pod.Status.NominatedNodeName) == 0 {
return true, nil
}
return false, err
}); err != nil {
t.Errorf(".status.nominatedNodeName of the medium priority pod was not cleared: %v", err)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := configtesting.V1beta3ToInternalWithDefaults(t, v1beta3.KubeSchedulerConfiguration{
Profiles: []v1beta3.KubeSchedulerProfile{{
SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
Plugins: tt.customPlugins,
}},
})
testCtx := initTest(
t,
"preemption",
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(tt.outOfTreeRegistry),
)
t.Cleanup(func() {
testutils.CleanupTest(t, testCtx)
})
cs, ns := testCtx.ClientSet, testCtx.NS.Name
// Create a node with the specified capacity.
nodeName := "fake-node"
if _, err := createNode(cs, st.MakeNode().Name(nodeName).Capacity(tt.nodeCapacity).Obj()); err != nil {
t.Fatalf("Error creating node %v: %v", nodeName, err)
}
// Create pods and run post check if necessary.
for i, pods := range tt.podsToCreate {
for _, p := range pods {
p.Namespace = ns
if _, err := createPausePod(cs, p); err != nil {
t.Fatalf("Error creating pod %v: %v", p.Name, err)
}
}
// If necessary, run the post check function.
if len(tt.postChecks) > i && tt.postChecks[i] != nil {
for _, p := range pods {
if err := tt.postChecks[i](cs, p); err != nil {
t.Fatalf("Pod %v didn't pass the postChecks[%v]: %v", p.Name, i, err)
}
}
}
}
// Delete the node if necessary.
if tt.deleteNode {
if err := cs.CoreV1().Nodes().Delete(context.TODO(), nodeName, *metav1.NewDeleteOptions(0)); err != nil {
t.Fatalf("Node %v cannot be deleted: %v", nodeName, err)
}
}
// Force deleting the terminating pods if necessary.
// This is required if we demand to delete terminating Pods physically.
for _, podName := range tt.podNamesToDelete {
if err := deletePod(cs, podName, ns); err != nil {
t.Fatalf("Pod %v cannot be deleted: %v", podName, err)
}
}
// Verify if .status.nominatedNodeName is cleared.
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
pod, err := cs.CoreV1().Pods(ns).Get(context.TODO(), "medium", metav1.GetOptions{})
if err != nil {
t.Errorf("Error getting the medium pod: %v", err)
}
if len(pod.Status.NominatedNodeName) == 0 {
return true, nil
}
return false, err
}); err != nil {
t.Errorf(".status.nominatedNodeName of the medium pod was not cleared: %v", err)
}
})
}
}
@ -1344,7 +1458,7 @@ func TestPreferNominatedNode(t *testing.T) {
nodeNames []string
existingPods []*v1.Pod
pod *v1.Pod
runnningNode string
runningNode string
}{
{
name: "nominated node released all resource, preemptor is scheduled to the nominated node",
@ -1365,7 +1479,7 @@ func TestPreferNominatedNode(t *testing.T) {
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
},
}),
runnningNode: "node-1",
runningNode: "node-1",
},
{
name: "nominated node cannot pass all the filters, preemptor should find a different node",
@ -1386,7 +1500,7 @@ func TestPreferNominatedNode(t *testing.T) {
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
},
}),
runnningNode: "node-2",
runningNode: "node-2",
},
}
@ -1435,8 +1549,8 @@ func TestPreferNominatedNode(t *testing.T) {
t.Errorf("Cannot schedule Pod %v/%v, error: %v", test.pod.Namespace, test.pod.Name, err)
}
// Make sure the pod has been scheduled to the right node.
if preemptor.Spec.NodeName != test.runnningNode {
t.Errorf("Expect pod running on %v, got %v.", test.runnningNode, preemptor.Spec.NodeName)
if preemptor.Spec.NodeName != test.runningNode {
t.Errorf("Expect pod running on %v, got %v.", test.runningNode, preemptor.Spec.NodeName)
}
})
}

View File

@ -462,35 +462,6 @@ func getPod(cs clientset.Interface, podName string, podNamespace string) (*v1.Po
return cs.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
}
// noPodsInNamespace returns true if no pods in the given namespace.
func noPodsInNamespace(c clientset.Interface, podNamespace string) wait.ConditionFunc {
return func() (bool, error) {
pods, err := c.CoreV1().Pods(podNamespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return false, err
}
return len(pods.Items) == 0, nil
}
}
// cleanupPodsInNamespace deletes the pods in the given namespace and waits for them to
// be actually deleted. They are removed with no grace.
func cleanupPodsInNamespace(cs clientset.Interface, t *testing.T, ns string) {
t.Helper()
zero := int64(0)
if err := cs.CoreV1().Pods(ns).DeleteCollection(context.TODO(), metav1.DeleteOptions{GracePeriodSeconds: &zero}, metav1.ListOptions{}); err != nil {
t.Errorf("error while listing pod in namespace %v: %v", ns, err)
return
}
if err := wait.Poll(time.Second, wait.ForeverTestTimeout,
noPodsInNamespace(cs, ns)); err != nil {
t.Errorf("error while waiting for pods in namespace %v: %v", ns, err)
}
}
// podScheduled returns true if a node is assigned to the given pod.
func podScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
return func() (bool, error) {