Merge pull request #105727 from chendave/wrong_status

Fix the inaccurate status when a plugin internal status is found
This commit is contained in:
Kubernetes Prow Robot 2021-10-27 19:45:02 -07:00 committed by GitHub
commit 87b0412232
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 106 additions and 19 deletions

View File

@ -18,6 +18,8 @@ package defaultpreemption
import (
"context"
"errors"
"fmt"
"math/rand"
"sort"
"strings"
@ -95,6 +97,45 @@ func getDefaultDefaultPreemptionArgs() *config.DefaultPreemptionArgs {
var nodeResourcesFitFunc = frameworkruntime.FactoryAdapter(feature.Features{}, noderesources.NewFit)
// TestPlugin returns Error status when trying to `AddPod` or `RemovePod` on the nodes which have the {k,v} label pair defined on the nodes.
type TestPlugin struct {
name string
}
func newTestPlugin(injArgs runtime.Object, f framework.Handle) (framework.Plugin, error) {
return &TestPlugin{name: "test-plugin"}, nil
}
func (pl *TestPlugin) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if nodeInfo.Node().GetLabels()["error"] == "true" {
return framework.AsStatus(fmt.Errorf("failed to add pod: %v", podToSchedule.Name))
}
return nil
}
func (pl *TestPlugin) RemovePod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if nodeInfo.Node().GetLabels()["error"] == "true" {
return framework.AsStatus(fmt.Errorf("failed to remove pod: %v", podToSchedule.Name))
}
return nil
}
func (pl *TestPlugin) Name() string {
return pl.name
}
func (pl *TestPlugin) PreFilterExtensions() framework.PreFilterExtensions {
return pl
}
func (pl *TestPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) *framework.Status {
return nil
}
func (pl *TestPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
return nil
}
func TestPostFilter(t *testing.T) {
onePodRes := map[v1.ResourceName]string{v1.ResourcePods: "1"}
nodeRes := map[v1.ResourceName]string{v1.ResourceCPU: "200m", v1.ResourceMemory: "400"}
@ -248,6 +289,39 @@ func TestPostFilter(t *testing.T) {
wantResult: nil,
wantStatus: framework.NewStatus(framework.Unschedulable, "0/4 nodes are available: 2 Insufficient cpu, 2 Preemption is not helpful for scheduling."),
},
{
name: "only one node but failed with TestPlugin",
pod: st.MakePod().Name("p").UID("p").Namespace(v1.NamespaceDefault).Priority(highPriority).Req(largeRes).Obj(),
pods: []*v1.Pod{
st.MakePod().Name("p1").UID("p1").Namespace(v1.NamespaceDefault).Node("node1").Obj(),
},
// label the node with key as "error" so that the TestPlugin will fail with error.
nodes: []*v1.Node{st.MakeNode().Name("node1").Capacity(largeRes).Label("error", "true").Obj()},
filteredNodesStatuses: framework.NodeToStatusMap{"node1": framework.NewStatus(framework.Unschedulable)},
wantResult: nil,
wantStatus: framework.AsStatus(errors.New("running RemovePod on PreFilter plugin \"test-plugin\": failed to remove pod: p")),
},
{
name: "one failed with TestPlugin and the other pass",
pod: st.MakePod().Name("p").UID("p").Namespace(v1.NamespaceDefault).Priority(highPriority).Req(largeRes).Obj(),
pods: []*v1.Pod{
st.MakePod().Name("p1").UID("p1").Namespace(v1.NamespaceDefault).Node("node1").Obj(),
st.MakePod().Name("p2").UID("p2").Namespace(v1.NamespaceDefault).Node("node2").Req(mediumRes).Obj(),
},
// even though node1 will fail with error but node2 will still be returned as a valid nominated node.
nodes: []*v1.Node{
st.MakeNode().Name("node1").Capacity(largeRes).Label("error", "true").Obj(),
st.MakeNode().Name("node2").Capacity(largeRes).Obj(),
},
filteredNodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
wantResult: &framework.PostFilterResult{
NominatedNodeName: "node2",
},
wantStatus: framework.NewStatus(framework.Success),
},
}
for _, tt := range tests {
@ -268,6 +342,7 @@ func TestPostFilter(t *testing.T) {
registeredPlugins := []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPluginAsExtensions(noderesources.FitName, nodeResourcesFitFunc, "Filter", "PreFilter"),
st.RegisterPluginAsExtensions("test-plugin", newTestPlugin, "PreFilter"),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}
var extenders []framework.Extender
@ -299,8 +374,15 @@ func TestPostFilter(t *testing.T) {
}
gotResult, gotStatus := p.PostFilter(context.TODO(), state, tt.pod, tt.filteredNodesStatuses)
if diff := cmp.Diff(tt.wantStatus, gotStatus); diff != "" {
t.Errorf("Unexpected status (-want, +got):\n%s", diff)
// As we cannot compare two errors directly due to miss the equal method for how to compare two errors, so just need to compare the reasons.
if gotStatus.Code() == framework.Error {
if diff := cmp.Diff(tt.wantStatus.Reasons(), gotStatus.Reasons()); diff != "" {
t.Errorf("Unexpected status (-want, +got):\n%s", diff)
}
} else {
if diff := cmp.Diff(tt.wantStatus, gotStatus); diff != "" {
t.Errorf("Unexpected status (-want, +got):\n%s", diff)
}
}
if diff := cmp.Diff(tt.wantResult, gotResult); diff != "" {
t.Errorf("Unexpected postFilterResult (-want, +got):\n%s", diff)
@ -1054,7 +1136,7 @@ func TestDryRunPreemption(t *testing.T) {
Interface: pl,
}
offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos)))
got, _ := pe.DryRunPreemption(context.Background(), pod, nodeInfos, tt.pdbs, offset, numCandidates)
got, _, _ := pe.DryRunPreemption(context.Background(), pod, nodeInfos, tt.pdbs, offset, numCandidates)
// Sort the values (inner victims) and the candidate itself (by its NominatedNodeName).
for i := range got {
victims := got[i].Victims().Pods
@ -1290,7 +1372,7 @@ func TestSelectBestCandidate(t *testing.T) {
Interface: pl,
}
offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos)))
candidates, _ := pe.DryRunPreemption(context.Background(), tt.pod, nodeInfos, nil, offset, numCandidates)
candidates, _, _ := pe.DryRunPreemption(context.Background(), tt.pod, nodeInfos, nil, offset, numCandidates)
s := pe.SelectCandidate(candidates)
if s == nil || len(s.Name()) == 0 {
return

View File

@ -27,6 +27,7 @@ import (
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
corelisters "k8s.io/client-go/listers/core/v1"
policylisters "k8s.io/client-go/listers/policy/v1"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
@ -142,9 +143,9 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
}
// 2) Find all preemption candidates.
candidates, nodeToStatusMap, status := ev.findCandidates(ctx, pod, m)
if !status.IsSuccess() {
return nil, status
candidates, nodeToStatusMap, err := ev.findCandidates(ctx, pod, m)
if err != nil && len(candidates) == 0 {
return nil, framework.AsStatus(err)
}
// Return a FitError only when there are no candidates that fit the pod.
@ -161,7 +162,7 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
}
// 3) Interact with registered Extenders to filter out some candidates if needed.
candidates, status = ev.callExtenders(pod, candidates)
candidates, status := ev.callExtenders(pod, candidates)
if !status.IsSuccess() {
return nil, status
}
@ -182,13 +183,13 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
// FindCandidates calculates a slice of preemption candidates.
// Each candidate is executable to make the given <pod> schedulable.
func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) ([]Candidate, framework.NodeToStatusMap, *framework.Status) {
func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) ([]Candidate, framework.NodeToStatusMap, error) {
allNodes, err := ev.Handler.SnapshotSharedLister().NodeInfos().List()
if err != nil {
return nil, nil, framework.AsStatus(err)
return nil, nil, err
}
if len(allNodes) == 0 {
return nil, nil, framework.NewStatus(framework.Error, "no nodes available")
return nil, nil, errors.New("no nodes available")
}
potentialNodes, unschedulableNodeStatus := nodesWherePreemptionMightHelp(allNodes, m)
if len(potentialNodes) == 0 {
@ -203,7 +204,7 @@ func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framewor
pdbs, err := getPodDisruptionBudgets(ev.PdbLister)
if err != nil {
return nil, nil, framework.AsStatus(err)
return nil, nil, err
}
offset, numCandidates := ev.GetOffsetAndNumCandidates(int32(len(potentialNodes)))
@ -214,11 +215,11 @@ func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framewor
}
klog.Infof("from a pool of %d nodes (offset: %d, sample %d nodes: %v), ~%d candidates will be chosen", len(potentialNodes), offset, len(sample), sample, numCandidates)
}
candidates, nodeStatuses := ev.DryRunPreemption(ctx, pod, potentialNodes, pdbs, offset, numCandidates)
for node, status := range unschedulableNodeStatus {
nodeStatuses[node] = status
candidates, nodeStatuses, err := ev.DryRunPreemption(ctx, pod, potentialNodes, pdbs, offset, numCandidates)
for node, nodeStatus := range unschedulableNodeStatus {
nodeStatuses[node] = nodeStatus
}
return candidates, nodeStatuses, nil
return candidates, nodeStatuses, err
}
// callExtenders calls given <extenders> to select the list of feasible candidates.
@ -531,13 +532,14 @@ func getLowerPriorityNominatedPods(pn framework.PodNominator, pod *v1.Pod, nodeN
// candidates, ones that do not violate PDB are preferred over ones that do.
// NOTE: This method is exported for easier testing in default preemption.
func (ev *Evaluator) DryRunPreemption(ctx context.Context, pod *v1.Pod, potentialNodes []*framework.NodeInfo,
pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) ([]Candidate, framework.NodeToStatusMap) {
pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) ([]Candidate, framework.NodeToStatusMap, error) {
fh := ev.Handler
nonViolatingCandidates := newCandidateList(numCandidates)
violatingCandidates := newCandidateList(numCandidates)
parallelCtx, cancel := context.WithCancel(ctx)
nodeStatuses := make(framework.NodeToStatusMap)
var statusesLock sync.Mutex
var errs []error
checkNode := func(i int) {
nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Clone()
stateCopy := ev.State.Clone()
@ -566,9 +568,12 @@ func (ev *Evaluator) DryRunPreemption(ctx context.Context, pod *v1.Pod, potentia
status = framework.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeInfoCopy.Node().Name))
}
statusesLock.Lock()
if status.Code() == framework.Error {
errs = append(errs, status.AsError())
}
nodeStatuses[nodeInfoCopy.Node().Name] = status
statusesLock.Unlock()
}
fh.Parallelizer().Until(parallelCtx, len(potentialNodes), checkNode)
return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses
return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses, utilerrors.NewAggregate(errs)
}

View File

@ -313,7 +313,7 @@ func TestDryRunPreemption(t *testing.T) {
Interface: fakePostPlugin,
State: state,
}
got, _ := pe.DryRunPreemption(context.Background(), pod, nodeInfos, nil, 0, int32(len(nodeInfos)))
got, _, _ := pe.DryRunPreemption(context.Background(), pod, nodeInfos, nil, 0, int32(len(nodeInfos)))
// Sort the values (inner victims) and the candidate itself (by its NominatedNodeName).
for i := range got {
victims := got[i].Victims().Pods