Merge pull request #97184 from chendave/add_status

Show the details on the failure of preemption
This commit is contained in:
Kubernetes Prow Robot 2021-01-15 10:25:51 -08:00 committed by GitHub
commit 38585884e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 122 additions and 73 deletions

View File

@ -20,8 +20,6 @@ import (
"context"
"fmt"
"math/rand"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
@ -52,42 +50,9 @@ const (
minFeasibleNodesPercentageToFind = 5
)
// FitError describes a fit error of a pod.
type FitError struct {
Pod *v1.Pod
NumAllNodes int
FilteredNodesStatuses framework.NodeToStatusMap
}
// ErrNoNodesAvailable is used to describe the error that no nodes available to schedule pods.
var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
const (
// NoNodeAvailableMsg is used to format message when no nodes available.
NoNodeAvailableMsg = "0/%v nodes are available"
)
// Error returns detailed information of why the pod failed to fit on each node
func (f *FitError) Error() string {
reasons := make(map[string]int)
for _, status := range f.FilteredNodesStatuses {
for _, reason := range status.Reasons() {
reasons[reason]++
}
}
sortReasonsHistogram := func() []string {
var reasonStrings []string
for k, v := range reasons {
reasonStrings = append(reasonStrings, fmt.Sprintf("%v %v", v, k))
}
sort.Strings(reasonStrings)
return reasonStrings
}
reasonMsg := fmt.Sprintf(NoNodeAvailableMsg+": %v.", f.NumAllNodes, strings.Join(sortReasonsHistogram(), ", "))
return reasonMsg
}
// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods
// onto machines.
// TODO: Rename this type.
@ -147,7 +112,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework
trace.Step("Computing predicates done")
if len(feasibleNodes) == 0 {
return result, &FitError{
return result, &framework.FitError{
Pod: pod,
NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
FilteredNodesStatuses: filteredNodesStatuses,

View File

@ -289,7 +289,7 @@ func TestGenericScheduler(t *testing.T) {
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
name: "test 1",
wErr: &FitError{
wErr: &framework.FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
NumAllNodes: 2,
FilteredNodesStatuses: framework.NodeToStatusMap{
@ -374,7 +374,7 @@ func TestGenericScheduler(t *testing.T) {
nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
name: "test 7",
wErr: &FitError{
wErr: &framework.FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
NumAllNodes: 3,
FilteredNodesStatuses: framework.NodeToStatusMap{
@ -406,7 +406,7 @@ func TestGenericScheduler(t *testing.T) {
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
nodes: []string{"1", "2"},
name: "test 8",
wErr: &FitError{
wErr: &framework.FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
NumAllNodes: 2,
FilteredNodesStatuses: framework.NodeToStatusMap{
@ -640,7 +640,7 @@ func TestGenericScheduler(t *testing.T) {
nodes: []string{"3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil,
wErr: &FitError{
wErr: &framework.FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
NumAllNodes: 1,
FilteredNodesStatuses: framework.NodeToStatusMap{
@ -662,7 +662,7 @@ func TestGenericScheduler(t *testing.T) {
nodes: []string{"3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil,
wErr: &FitError{
wErr: &framework.FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
NumAllNodes: 1,
FilteredNodesStatuses: framework.NodeToStatusMap{
@ -699,7 +699,7 @@ func TestGenericScheduler(t *testing.T) {
nodes: []string{"1", "2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
expectedHosts: nil,
wErr: &FitError{
wErr: &framework.FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
NumAllNodes: 2,
FilteredNodesStatuses: framework.NodeToStatusMap{

View File

@ -318,7 +318,7 @@ func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodL
pod := podInfo.Pod
if err == core.ErrNoNodesAvailable {
klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod))
} else if _, ok := err.(*core.FitError); ok {
} else if _, ok := err.(*framework.FitError); ok {
klog.V(2).InfoS("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", err)
} else if apierrors.IsNotFound(err) {
klog.V(2).InfoS("Unable to schedule pod, possibly due to node not found; waiting", "pod", klog.KObj(pod), "err", err)

View File

@ -63,7 +63,7 @@ const (
// scheduler skip preemption.
// The accompanying status message should explain why the pod is unschedulable.
Unschedulable
// UnschedulableAndUnresolvable is used when a PreFilter plugin finds a pod unschedulable and
// UnschedulableAndUnresolvable is used when a plugin finds a pod unschedulable and
// preemption would not change anything. Plugins should return Unschedulable if it is possible
// that the pod can get scheduled with preemption.
// The accompanying status message should explain why the pod is unschedulable.

View File

@ -22,6 +22,7 @@ import (
"math"
"math/rand"
"sort"
"sync"
"sync/atomic"
"k8s.io/klog/v2"
@ -93,8 +94,12 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy
nnn, err := pl.preempt(ctx, state, pod, m)
if err != nil {
if _, ok := err.(*framework.FitError); ok {
return nil, framework.NewStatus(framework.Unschedulable, err.Error())
}
return nil, framework.AsStatus(err)
}
// This happens when the pod is not eligible for preemption or extenders filtered all candidates.
if nnn == "" {
return nil, framework.NewStatus(framework.Unschedulable)
}
@ -214,8 +219,16 @@ func (pl *DefaultPreemption) FindCandidates(ctx context.Context, state *framewor
}
klog.Infof("from a pool of %d nodes (offset: %d, sample %d nodes: %v), ~%d candidates will be chosen", len(potentialNodes), offset, len(sample), sample, numCandidates)
}
return dryRunPreemption(ctx, pl.fh, state, pod, potentialNodes, pdbs, offset, numCandidates), nil
candidates, nodeStatuses := dryRunPreemption(ctx, pl.fh, state, pod, potentialNodes, pdbs, offset, numCandidates)
// Return a FitError only when there are no candidates that fit the pod.
if len(candidates) == 0 {
return candidates, &framework.FitError{
Pod: pod,
NumAllNodes: len(potentialNodes),
FilteredNodesStatuses: nodeStatuses,
}
}
return candidates, nil
}
// PodEligibleToPreemptOthers determines whether this pod should be considered
@ -301,21 +314,22 @@ func (cl *candidateList) get() []Candidate {
}
// dryRunPreemption simulates Preemption logic on <potentialNodes> in parallel,
// and returns preemption candidates. The number of candidates depends on the
// constraints defined in the plugin's args. In the returned list of
// returns preemption candidates and a map indicating filtered nodes statuses.
// The number of candidates depends on the constraints defined in the plugin's args. In the returned list of
// candidates, ones that do not violate PDB are preferred over ones that do.
func dryRunPreemption(ctx context.Context, fh framework.Handle,
state *framework.CycleState, pod *v1.Pod, potentialNodes []*framework.NodeInfo,
pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) []Candidate {
pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) ([]Candidate, framework.NodeToStatusMap) {
nonViolatingCandidates := newCandidateList(numCandidates)
violatingCandidates := newCandidateList(numCandidates)
parallelCtx, cancel := context.WithCancel(ctx)
nodeStatuses := make(framework.NodeToStatusMap)
var statusesLock sync.Mutex
checkNode := func(i int) {
nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Clone()
stateCopy := state.Clone()
pods, numPDBViolations, fits := selectVictimsOnNode(ctx, fh, stateCopy, pod, nodeInfoCopy, pdbs)
if fits {
pods, numPDBViolations, status := selectVictimsOnNode(ctx, fh, stateCopy, pod, nodeInfoCopy, pdbs)
if status.IsSuccess() {
victims := extenderv1.Victims{
Pods: pods,
NumPDBViolations: int64(numPDBViolations),
@ -333,10 +347,14 @@ func dryRunPreemption(ctx context.Context, fh framework.Handle,
if nvcSize > 0 && nvcSize+vcSize >= numCandidates {
cancel()
}
} else {
statusesLock.Lock()
nodeStatuses[nodeInfoCopy.Node().Name] = status
statusesLock.Unlock()
}
}
parallelize.Until(parallelCtx, len(potentialNodes), checkNode)
return append(nonViolatingCandidates.get(), violatingCandidates.get()...)
return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses
}
// CallExtenders calls given <extenders> to select the list of feasible candidates.
@ -578,9 +596,8 @@ func selectVictimsOnNode(
pod *v1.Pod,
nodeInfo *framework.NodeInfo,
pdbs []*policy.PodDisruptionBudget,
) ([]*v1.Pod, int, bool) {
) ([]*v1.Pod, int, *framework.Status) {
var potentialVictims []*v1.Pod
ph := fh.PreemptHandle()
removePod := func(rp *v1.Pod) error {
if err := nodeInfo.RemovePod(rp); err != nil {
@ -607,14 +624,15 @@ func selectVictimsOnNode(
if corev1helpers.PodPriority(p.Pod) < podPriority {
potentialVictims = append(potentialVictims, p.Pod)
if err := removePod(p.Pod); err != nil {
return nil, 0, false
return nil, 0, framework.NewStatus(framework.Error, err.Error())
}
}
}
// No potential victims are found, and so we don't need to evaluate the node again since its state didn't change.
if len(potentialVictims) == 0 {
return nil, 0, false
message := fmt.Sprintf("No victims found on node %v for preemptor pod %v", nodeInfo.Node().Name, pod.Name)
return nil, 0, framework.NewStatus(framework.UnschedulableAndUnresolvable, message)
}
// If the new pod does not fit after removing all the lower priority pods,
@ -624,11 +642,7 @@ func selectVictimsOnNode(
// support this case for performance reasons. Having affinity to lower
// priority pods is not a recommended configuration anyway.
if status := fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo); !status.IsSuccess() {
if status.Code() == framework.Error {
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, status.AsError())
}
return nil, 0, false
return nil, 0, status
}
var victims []*v1.Pod
numViolatingVictim := 0
@ -654,8 +668,7 @@ func selectVictimsOnNode(
}
for _, p := range violatingVictims {
if fits, err := reprievePod(p); err != nil {
klog.Warningf("Failed to reprieve pod %q: %v", p.Name, err)
return nil, 0, false
return nil, 0, framework.NewStatus(framework.Error, err.Error())
} else if !fits {
numViolatingVictim++
}
@ -663,11 +676,10 @@ func selectVictimsOnNode(
// Now we try to reprieve non-violating victims.
for _, p := range nonViolatingVictims {
if _, err := reprievePod(p); err != nil {
klog.Warningf("Failed to reprieve pod %q: %v", p.Name, err)
return nil, 0, false
return nil, 0, framework.NewStatus(framework.Error, err.Error())
}
}
return victims, numViolatingVictim, true
return victims, numViolatingVictim, framework.NewStatus(framework.Success)
}
// PrepareCandidate does some preparation work before nominating the selected candidate:

View File

@ -100,6 +100,7 @@ func getDefaultDefaultPreemptionArgs() *config.DefaultPreemptionArgs {
func TestPostFilter(t *testing.T) {
onePodRes := map[v1.ResourceName]string{v1.ResourcePods: "1"}
nodeRes := map[v1.ResourceName]string{v1.ResourceCPU: "200m", v1.ResourceMemory: "400"}
tests := []struct {
name string
pod *v1.Pod
@ -138,7 +139,7 @@ func TestPostFilter(t *testing.T) {
"node1": framework.NewStatus(framework.Unschedulable),
},
wantResult: nil,
wantStatus: framework.NewStatus(framework.Unschedulable),
wantStatus: framework.NewStatus(framework.Unschedulable, "0/1 nodes are available: 1 No victims found on node node1 for preemptor pod p."),
},
{
name: "preemption should respect filteredNodesStatuses",
@ -194,6 +195,42 @@ func TestPostFilter(t *testing.T) {
},
wantStatus: framework.NewStatus(framework.Success),
},
{
name: "no candidate nodes found, no enough resource after removing low priority pods",
pod: st.MakePod().Name("p").UID("p").Namespace(v1.NamespaceDefault).Priority(highPriority).Req(largeRes).Obj(),
pods: []*v1.Pod{
st.MakePod().Name("p1").UID("p1").Namespace(v1.NamespaceDefault).Node("node1").Obj(),
st.MakePod().Name("p2").UID("p2").Namespace(v1.NamespaceDefault).Node("node2").Obj(),
},
nodes: []*v1.Node{
st.MakeNode().Name("node1").Capacity(nodeRes).Obj(), // no enough CPU resource
st.MakeNode().Name("node2").Capacity(nodeRes).Obj(), // no enough CPU resource
},
filteredNodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
wantResult: nil,
wantStatus: framework.NewStatus(framework.Unschedulable, "0/2 nodes are available: 2 Insufficient cpu."),
},
{
name: "no candidate nodes found with mixed reasons, no lower priority pod and no enough CPU resource",
pod: st.MakePod().Name("p").UID("p").Namespace(v1.NamespaceDefault).Priority(highPriority).Req(largeRes).Obj(),
pods: []*v1.Pod{
st.MakePod().Name("p1").UID("p1").Namespace(v1.NamespaceDefault).Node("node1").Priority(highPriority).Obj(),
st.MakePod().Name("p2").UID("p2").Namespace(v1.NamespaceDefault).Node("node2").Obj(),
},
nodes: []*v1.Node{
st.MakeNode().Name("node1").Capacity(onePodRes).Obj(), // no pod will be preempted
st.MakeNode().Name("node2").Capacity(nodeRes).Obj(), // no enough CPU resource
},
filteredNodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
wantResult: nil,
wantStatus: framework.NewStatus(framework.Unschedulable, "0/2 nodes are available: 1 Insufficient cpu, 1 No victims found on node node1 for preemptor pod p."),
},
}
for _, tt := range tests {
@ -978,7 +1015,7 @@ func TestDryRunPreemption(t *testing.T) {
t.Errorf("cycle %d: Unexpected PreFilter Status: %v", cycle, status)
}
offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(nodeInfos)))
got := dryRunPreemption(context.Background(), fwk, state, pod, nodeInfos, tt.pdbs, offset, numCandidates)
got, _ := dryRunPreemption(context.Background(), fwk, state, pod, nodeInfos, tt.pdbs, offset, numCandidates)
if err != nil {
t.Fatal(err)
}
@ -1201,7 +1238,7 @@ func TestSelectBestCandidate(t *testing.T) {
pl := &DefaultPreemption{args: *getDefaultDefaultPreemptionArgs()}
offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(nodeInfos)))
candidates := dryRunPreemption(context.Background(), fwk, state, tt.pod, nodeInfos, nil, offset, numCandidates)
candidates, _ := dryRunPreemption(context.Background(), fwk, state, tt.pod, nodeInfos, nil, offset, numCandidates)
s := SelectCandidate(candidates)
found := false
for _, nodeName := range tt.expected {

View File

@ -19,6 +19,8 @@ package framework
import (
"errors"
"fmt"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
@ -89,6 +91,39 @@ type WeightedAffinityTerm struct {
Weight int32
}
// FitError describes a fit error of a pod.
type FitError struct {
Pod *v1.Pod
NumAllNodes int
FilteredNodesStatuses NodeToStatusMap
}
const (
// NoNodeAvailableMsg is used to format message when no nodes available.
NoNodeAvailableMsg = "0/%v nodes are available"
)
// Error returns detailed information of why the pod failed to fit on each node
func (f *FitError) Error() string {
reasons := make(map[string]int)
for _, status := range f.FilteredNodesStatuses {
for _, reason := range status.Reasons() {
reasons[reason]++
}
}
sortReasonsHistogram := func() []string {
var reasonStrings []string
for k, v := range reasons {
reasonStrings = append(reasonStrings, fmt.Sprintf("%v %v", v, k))
}
sort.Strings(reasonStrings)
return reasonStrings
}
reasonMsg := fmt.Sprintf(NoNodeAvailableMsg+": %v.", f.NumAllNodes, strings.Join(sortReasonsHistogram(), ", "))
return reasonMsg
}
func newAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm) (*AffinityTerm, error) {
namespaces := schedutil.GetNamespacesFromPodAffinityTerm(pod, term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)

View File

@ -457,7 +457,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.
nominatedNode := ""
if fitError, ok := err.(*core.FitError); ok {
if fitError, ok := err.(*framework.FitError); ok {
if !fwk.HasPostFilterPlugins() {
klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
} else {

View File

@ -633,7 +633,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
scheduler.scheduleOne(context.Background())
select {
case err := <-errChan:
expectErr := &core.FitError{
expectErr := &framework.FitError{
Pod: secondPod,
NumAllNodes: 1,
FilteredNodesStatuses: framework.NodeToStatusMap{
@ -777,7 +777,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
scheduler.scheduleOne(context.Background())
select {
case err := <-errChan:
expectErr := &core.FitError{
expectErr := &framework.FitError{
Pod: podWithTooBigResourceRequests,
NumAllNodes: len(nodes),
FilteredNodesStatuses: failedNodeStatues,