From 468a6005a690e2a84acff8696b22774dabf84ab6 Mon Sep 17 00:00:00 2001 From: Dave Chen Date: Mon, 18 Oct 2021 14:43:39 +0800 Subject: [PATCH] Fix the return status when a plugin internal status is found Currently, the status code returned is `Unschedulable` when an internal error found, the `Unschedulable` status is built from a `FitError` which means no fit nodes found without a internal error. Instead of build an Unschedulable status from the `FitError`, return the Error status directly. Signed-off-by: Dave Chen --- .../default_preemption_test.go | 90 ++++++++++++++++++- .../framework/preemption/preemption.go | 33 ++++--- .../framework/preemption/preemption_test.go | 2 +- 3 files changed, 106 insertions(+), 19 deletions(-) diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index cdcc0240afd..b6703e1106a 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -18,6 +18,8 @@ package defaultpreemption import ( "context" + "errors" + "fmt" "math/rand" "sort" "strings" @@ -95,6 +97,45 @@ func getDefaultDefaultPreemptionArgs() *config.DefaultPreemptionArgs { var nodeResourcesFitFunc = frameworkruntime.FactoryAdapter(feature.Features{}, noderesources.NewFit) +// TestPlugin returns Error status when trying to `AddPod` or `RemovePod` on the nodes which have the {k,v} label pair defined on the nodes. +type TestPlugin struct { + name string +} + +func newTestPlugin(injArgs runtime.Object, f framework.Handle) (framework.Plugin, error) { + return &TestPlugin{name: "test-plugin"}, nil +} + +func (pl *TestPlugin) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { + if nodeInfo.Node().GetLabels()["error"] == "true" { + return framework.AsStatus(fmt.Errorf("failed to add pod: %v", podToSchedule.Name)) + } + return nil +} + +func (pl *TestPlugin) RemovePod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { + if nodeInfo.Node().GetLabels()["error"] == "true" { + return framework.AsStatus(fmt.Errorf("failed to remove pod: %v", podToSchedule.Name)) + } + return nil +} + +func (pl *TestPlugin) Name() string { + return pl.name +} + +func (pl *TestPlugin) PreFilterExtensions() framework.PreFilterExtensions { + return pl +} + +func (pl *TestPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) *framework.Status { + return nil +} + +func (pl *TestPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + return nil +} + func TestPostFilter(t *testing.T) { onePodRes := map[v1.ResourceName]string{v1.ResourcePods: "1"} nodeRes := map[v1.ResourceName]string{v1.ResourceCPU: "200m", v1.ResourceMemory: "400"} @@ -248,6 +289,39 @@ func TestPostFilter(t *testing.T) { wantResult: nil, wantStatus: framework.NewStatus(framework.Unschedulable, "0/4 nodes are available: 2 Insufficient cpu, 2 Preemption is not helpful for scheduling."), }, + { + name: "only one node but failed with TestPlugin", + pod: st.MakePod().Name("p").UID("p").Namespace(v1.NamespaceDefault).Priority(highPriority).Req(largeRes).Obj(), + pods: []*v1.Pod{ + st.MakePod().Name("p1").UID("p1").Namespace(v1.NamespaceDefault).Node("node1").Obj(), + }, + // label the node with key as "error" so that the TestPlugin will fail with error. + nodes: []*v1.Node{st.MakeNode().Name("node1").Capacity(largeRes).Label("error", "true").Obj()}, + filteredNodesStatuses: framework.NodeToStatusMap{"node1": framework.NewStatus(framework.Unschedulable)}, + wantResult: nil, + wantStatus: framework.AsStatus(errors.New("running RemovePod on PreFilter plugin \"test-plugin\": failed to remove pod: p")), + }, + { + name: "one failed with TestPlugin and the other pass", + pod: st.MakePod().Name("p").UID("p").Namespace(v1.NamespaceDefault).Priority(highPriority).Req(largeRes).Obj(), + pods: []*v1.Pod{ + st.MakePod().Name("p1").UID("p1").Namespace(v1.NamespaceDefault).Node("node1").Obj(), + st.MakePod().Name("p2").UID("p2").Namespace(v1.NamespaceDefault).Node("node2").Req(mediumRes).Obj(), + }, + // even though node1 will fail with error but node2 will still be returned as a valid nominated node. + nodes: []*v1.Node{ + st.MakeNode().Name("node1").Capacity(largeRes).Label("error", "true").Obj(), + st.MakeNode().Name("node2").Capacity(largeRes).Obj(), + }, + filteredNodesStatuses: framework.NodeToStatusMap{ + "node1": framework.NewStatus(framework.Unschedulable), + "node2": framework.NewStatus(framework.Unschedulable), + }, + wantResult: &framework.PostFilterResult{ + NominatedNodeName: "node2", + }, + wantStatus: framework.NewStatus(framework.Success), + }, } for _, tt := range tests { @@ -268,6 +342,7 @@ func TestPostFilter(t *testing.T) { registeredPlugins := []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterPluginAsExtensions(noderesources.FitName, nodeResourcesFitFunc, "Filter", "PreFilter"), + st.RegisterPluginAsExtensions("test-plugin", newTestPlugin, "PreFilter"), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), } var extenders []framework.Extender @@ -299,8 +374,15 @@ func TestPostFilter(t *testing.T) { } gotResult, gotStatus := p.PostFilter(context.TODO(), state, tt.pod, tt.filteredNodesStatuses) - if diff := cmp.Diff(tt.wantStatus, gotStatus); diff != "" { - t.Errorf("Unexpected status (-want, +got):\n%s", diff) + // As we cannot compare two errors directly due to miss the equal method for how to compare two errors, so just need to compare the reasons. + if gotStatus.Code() == framework.Error { + if diff := cmp.Diff(tt.wantStatus.Reasons(), gotStatus.Reasons()); diff != "" { + t.Errorf("Unexpected status (-want, +got):\n%s", diff) + } + } else { + if diff := cmp.Diff(tt.wantStatus, gotStatus); diff != "" { + t.Errorf("Unexpected status (-want, +got):\n%s", diff) + } } if diff := cmp.Diff(tt.wantResult, gotResult); diff != "" { t.Errorf("Unexpected postFilterResult (-want, +got):\n%s", diff) @@ -1054,7 +1136,7 @@ func TestDryRunPreemption(t *testing.T) { Interface: pl, } offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos))) - got, _ := pe.DryRunPreemption(context.Background(), pod, nodeInfos, tt.pdbs, offset, numCandidates) + got, _, _ := pe.DryRunPreemption(context.Background(), pod, nodeInfos, tt.pdbs, offset, numCandidates) // Sort the values (inner victims) and the candidate itself (by its NominatedNodeName). for i := range got { victims := got[i].Victims().Pods @@ -1290,7 +1372,7 @@ func TestSelectBestCandidate(t *testing.T) { Interface: pl, } offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos))) - candidates, _ := pe.DryRunPreemption(context.Background(), tt.pod, nodeInfos, nil, offset, numCandidates) + candidates, _, _ := pe.DryRunPreemption(context.Background(), tt.pod, nodeInfos, nil, offset, numCandidates) s := pe.SelectCandidate(candidates) if s == nil || len(s.Name()) == 0 { return diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index ba4967ebad7..ca62fc56146 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -27,6 +27,7 @@ import ( v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/labels" + utilerrors "k8s.io/apimachinery/pkg/util/errors" corelisters "k8s.io/client-go/listers/core/v1" policylisters "k8s.io/client-go/listers/policy/v1" corev1helpers "k8s.io/component-helpers/scheduling/corev1" @@ -142,9 +143,9 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT } // 2) Find all preemption candidates. - candidates, nodeToStatusMap, status := ev.findCandidates(ctx, pod, m) - if !status.IsSuccess() { - return nil, status + candidates, nodeToStatusMap, err := ev.findCandidates(ctx, pod, m) + if err != nil && len(candidates) == 0 { + return nil, framework.AsStatus(err) } // Return a FitError only when there are no candidates that fit the pod. @@ -161,7 +162,7 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT } // 3) Interact with registered Extenders to filter out some candidates if needed. - candidates, status = ev.callExtenders(pod, candidates) + candidates, status := ev.callExtenders(pod, candidates) if !status.IsSuccess() { return nil, status } @@ -182,13 +183,13 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT // FindCandidates calculates a slice of preemption candidates. // Each candidate is executable to make the given schedulable. -func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) ([]Candidate, framework.NodeToStatusMap, *framework.Status) { +func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) ([]Candidate, framework.NodeToStatusMap, error) { allNodes, err := ev.Handler.SnapshotSharedLister().NodeInfos().List() if err != nil { - return nil, nil, framework.AsStatus(err) + return nil, nil, err } if len(allNodes) == 0 { - return nil, nil, framework.NewStatus(framework.Error, "no nodes available") + return nil, nil, errors.New("no nodes available") } potentialNodes, unschedulableNodeStatus := nodesWherePreemptionMightHelp(allNodes, m) if len(potentialNodes) == 0 { @@ -203,7 +204,7 @@ func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framewor pdbs, err := getPodDisruptionBudgets(ev.PdbLister) if err != nil { - return nil, nil, framework.AsStatus(err) + return nil, nil, err } offset, numCandidates := ev.GetOffsetAndNumCandidates(int32(len(potentialNodes))) @@ -214,11 +215,11 @@ func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framewor } klog.Infof("from a pool of %d nodes (offset: %d, sample %d nodes: %v), ~%d candidates will be chosen", len(potentialNodes), offset, len(sample), sample, numCandidates) } - candidates, nodeStatuses := ev.DryRunPreemption(ctx, pod, potentialNodes, pdbs, offset, numCandidates) - for node, status := range unschedulableNodeStatus { - nodeStatuses[node] = status + candidates, nodeStatuses, err := ev.DryRunPreemption(ctx, pod, potentialNodes, pdbs, offset, numCandidates) + for node, nodeStatus := range unschedulableNodeStatus { + nodeStatuses[node] = nodeStatus } - return candidates, nodeStatuses, nil + return candidates, nodeStatuses, err } // callExtenders calls given to select the list of feasible candidates. @@ -531,13 +532,14 @@ func getLowerPriorityNominatedPods(pn framework.PodNominator, pod *v1.Pod, nodeN // candidates, ones that do not violate PDB are preferred over ones that do. // NOTE: This method is exported for easier testing in default preemption. func (ev *Evaluator) DryRunPreemption(ctx context.Context, pod *v1.Pod, potentialNodes []*framework.NodeInfo, - pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) ([]Candidate, framework.NodeToStatusMap) { + pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) ([]Candidate, framework.NodeToStatusMap, error) { fh := ev.Handler nonViolatingCandidates := newCandidateList(numCandidates) violatingCandidates := newCandidateList(numCandidates) parallelCtx, cancel := context.WithCancel(ctx) nodeStatuses := make(framework.NodeToStatusMap) var statusesLock sync.Mutex + var errs []error checkNode := func(i int) { nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Clone() stateCopy := ev.State.Clone() @@ -566,9 +568,12 @@ func (ev *Evaluator) DryRunPreemption(ctx context.Context, pod *v1.Pod, potentia status = framework.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeInfoCopy.Node().Name)) } statusesLock.Lock() + if status.Code() == framework.Error { + errs = append(errs, status.AsError()) + } nodeStatuses[nodeInfoCopy.Node().Name] = status statusesLock.Unlock() } fh.Parallelizer().Until(parallelCtx, len(potentialNodes), checkNode) - return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses + return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses, utilerrors.NewAggregate(errs) } diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index 326e64c02ea..5b057f546bd 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -314,7 +314,7 @@ func TestDryRunPreemption(t *testing.T) { Interface: fakePostPlugin, State: state, } - got, _ := pe.DryRunPreemption(context.Background(), pod, nodeInfos, nil, 0, int32(len(nodeInfos))) + got, _, _ := pe.DryRunPreemption(context.Background(), pod, nodeInfos, nil, 0, int32(len(nodeInfos))) // Sort the values (inner victims) and the candidate itself (by its NominatedNodeName). for i := range got { victims := got[i].Victims().Pods