diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index b70d700584e..0c061e186b2 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -20,8 +20,6 @@ import ( "context" "fmt" "math/rand" - "sort" - "strings" "sync" "sync/atomic" "time" @@ -52,42 +50,9 @@ const ( minFeasibleNodesPercentageToFind = 5 ) -// FitError describes a fit error of a pod. -type FitError struct { - Pod *v1.Pod - NumAllNodes int - FilteredNodesStatuses framework.NodeToStatusMap -} - // ErrNoNodesAvailable is used to describe the error that no nodes available to schedule pods. var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods") -const ( - // NoNodeAvailableMsg is used to format message when no nodes available. - NoNodeAvailableMsg = "0/%v nodes are available" -) - -// Error returns detailed information of why the pod failed to fit on each node -func (f *FitError) Error() string { - reasons := make(map[string]int) - for _, status := range f.FilteredNodesStatuses { - for _, reason := range status.Reasons() { - reasons[reason]++ - } - } - - sortReasonsHistogram := func() []string { - var reasonStrings []string - for k, v := range reasons { - reasonStrings = append(reasonStrings, fmt.Sprintf("%v %v", v, k)) - } - sort.Strings(reasonStrings) - return reasonStrings - } - reasonMsg := fmt.Sprintf(NoNodeAvailableMsg+": %v.", f.NumAllNodes, strings.Join(sortReasonsHistogram(), ", ")) - return reasonMsg -} - // ScheduleAlgorithm is an interface implemented by things that know how to schedule pods // onto machines. // TODO: Rename this type. @@ -147,7 +112,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework trace.Step("Computing predicates done") if len(feasibleNodes) == 0 { - return result, &FitError{ + return result, &framework.FitError{ Pod: pod, NumAllNodes: g.nodeInfoSnapshot.NumNodes(), FilteredNodesStatuses: filteredNodesStatuses, diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index dfcd0a2a4ec..89ed3051cfe 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -289,7 +289,7 @@ func TestGenericScheduler(t *testing.T) { nodes: []string{"machine1", "machine2"}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}}, name: "test 1", - wErr: &FitError{ + wErr: &framework.FitError{ Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}}, NumAllNodes: 2, FilteredNodesStatuses: framework.NodeToStatusMap{ @@ -374,7 +374,7 @@ func TestGenericScheduler(t *testing.T) { nodes: []string{"3", "2", "1"}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}}, name: "test 7", - wErr: &FitError{ + wErr: &framework.FitError{ Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}}, NumAllNodes: 3, FilteredNodesStatuses: framework.NodeToStatusMap{ @@ -406,7 +406,7 @@ func TestGenericScheduler(t *testing.T) { pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}}, nodes: []string{"1", "2"}, name: "test 8", - wErr: &FitError{ + wErr: &framework.FitError{ Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}}, NumAllNodes: 2, FilteredNodesStatuses: framework.NodeToStatusMap{ @@ -640,7 +640,7 @@ func TestGenericScheduler(t *testing.T) { nodes: []string{"3"}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}}, expectedHosts: nil, - wErr: &FitError{ + wErr: &framework.FitError{ Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}}, NumAllNodes: 1, FilteredNodesStatuses: framework.NodeToStatusMap{ @@ -662,7 +662,7 @@ func TestGenericScheduler(t *testing.T) { nodes: []string{"3"}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}}, expectedHosts: nil, - wErr: &FitError{ + wErr: &framework.FitError{ Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}}, NumAllNodes: 1, FilteredNodesStatuses: framework.NodeToStatusMap{ @@ -699,7 +699,7 @@ func TestGenericScheduler(t *testing.T) { nodes: []string{"1", "2"}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}}, expectedHosts: nil, - wErr: &FitError{ + wErr: &framework.FitError{ Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}}, NumAllNodes: 2, FilteredNodesStatuses: framework.NodeToStatusMap{ diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index 9ce98680710..5f5879cbd9e 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -318,7 +318,7 @@ func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodL pod := podInfo.Pod if err == core.ErrNoNodesAvailable { klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod)) - } else if _, ok := err.(*core.FitError); ok { + } else if _, ok := err.(*framework.FitError); ok { klog.V(2).InfoS("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", err) } else if apierrors.IsNotFound(err) { klog.V(2).InfoS("Unable to schedule pod, possibly due to node not found; waiting", "pod", klog.KObj(pod), "err", err) diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index c1a392135b6..739893c705f 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -63,7 +63,7 @@ const ( // scheduler skip preemption. // The accompanying status message should explain why the pod is unschedulable. Unschedulable - // UnschedulableAndUnresolvable is used when a PreFilter plugin finds a pod unschedulable and + // UnschedulableAndUnresolvable is used when a plugin finds a pod unschedulable and // preemption would not change anything. Plugins should return Unschedulable if it is possible // that the pod can get scheduled with preemption. // The accompanying status message should explain why the pod is unschedulable. diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go index 6e7748a86cb..75f361042d6 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go @@ -22,6 +22,7 @@ import ( "math" "math/rand" "sort" + "sync" "sync/atomic" "k8s.io/klog/v2" @@ -93,8 +94,12 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy nnn, err := pl.preempt(ctx, state, pod, m) if err != nil { + if _, ok := err.(*framework.FitError); ok { + return nil, framework.NewStatus(framework.Unschedulable, err.Error()) + } return nil, framework.AsStatus(err) } + // This happens when the pod is not eligible for preemption or extenders filtered all candidates. if nnn == "" { return nil, framework.NewStatus(framework.Unschedulable) } @@ -214,8 +219,16 @@ func (pl *DefaultPreemption) FindCandidates(ctx context.Context, state *framewor } klog.Infof("from a pool of %d nodes (offset: %d, sample %d nodes: %v), ~%d candidates will be chosen", len(potentialNodes), offset, len(sample), sample, numCandidates) } - - return dryRunPreemption(ctx, pl.fh, state, pod, potentialNodes, pdbs, offset, numCandidates), nil + candidates, nodeStatuses := dryRunPreemption(ctx, pl.fh, state, pod, potentialNodes, pdbs, offset, numCandidates) + // Return a FitError only when there are no candidates that fit the pod. + if len(candidates) == 0 { + return candidates, &framework.FitError{ + Pod: pod, + NumAllNodes: len(potentialNodes), + FilteredNodesStatuses: nodeStatuses, + } + } + return candidates, nil } // PodEligibleToPreemptOthers determines whether this pod should be considered @@ -301,21 +314,22 @@ func (cl *candidateList) get() []Candidate { } // dryRunPreemption simulates Preemption logic on in parallel, -// and returns preemption candidates. The number of candidates depends on the -// constraints defined in the plugin's args. In the returned list of +// returns preemption candidates and a map indicating filtered nodes statuses. +// The number of candidates depends on the constraints defined in the plugin's args. In the returned list of // candidates, ones that do not violate PDB are preferred over ones that do. func dryRunPreemption(ctx context.Context, fh framework.Handle, state *framework.CycleState, pod *v1.Pod, potentialNodes []*framework.NodeInfo, - pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) []Candidate { + pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) ([]Candidate, framework.NodeToStatusMap) { nonViolatingCandidates := newCandidateList(numCandidates) violatingCandidates := newCandidateList(numCandidates) parallelCtx, cancel := context.WithCancel(ctx) - + nodeStatuses := make(framework.NodeToStatusMap) + var statusesLock sync.Mutex checkNode := func(i int) { nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Clone() stateCopy := state.Clone() - pods, numPDBViolations, fits := selectVictimsOnNode(ctx, fh, stateCopy, pod, nodeInfoCopy, pdbs) - if fits { + pods, numPDBViolations, status := selectVictimsOnNode(ctx, fh, stateCopy, pod, nodeInfoCopy, pdbs) + if status.IsSuccess() { victims := extenderv1.Victims{ Pods: pods, NumPDBViolations: int64(numPDBViolations), @@ -333,10 +347,14 @@ func dryRunPreemption(ctx context.Context, fh framework.Handle, if nvcSize > 0 && nvcSize+vcSize >= numCandidates { cancel() } + } else { + statusesLock.Lock() + nodeStatuses[nodeInfoCopy.Node().Name] = status + statusesLock.Unlock() } } parallelize.Until(parallelCtx, len(potentialNodes), checkNode) - return append(nonViolatingCandidates.get(), violatingCandidates.get()...) + return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses } // CallExtenders calls given to select the list of feasible candidates. @@ -578,9 +596,8 @@ func selectVictimsOnNode( pod *v1.Pod, nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget, -) ([]*v1.Pod, int, bool) { +) ([]*v1.Pod, int, *framework.Status) { var potentialVictims []*v1.Pod - ph := fh.PreemptHandle() removePod := func(rp *v1.Pod) error { if err := nodeInfo.RemovePod(rp); err != nil { @@ -607,14 +624,15 @@ func selectVictimsOnNode( if corev1helpers.PodPriority(p.Pod) < podPriority { potentialVictims = append(potentialVictims, p.Pod) if err := removePod(p.Pod); err != nil { - return nil, 0, false + return nil, 0, framework.NewStatus(framework.Error, err.Error()) } } } // No potential victims are found, and so we don't need to evaluate the node again since its state didn't change. if len(potentialVictims) == 0 { - return nil, 0, false + message := fmt.Sprintf("No victims found on node %v for preemptor pod %v", nodeInfo.Node().Name, pod.Name) + return nil, 0, framework.NewStatus(framework.UnschedulableAndUnresolvable, message) } // If the new pod does not fit after removing all the lower priority pods, @@ -624,11 +642,7 @@ func selectVictimsOnNode( // support this case for performance reasons. Having affinity to lower // priority pods is not a recommended configuration anyway. if status := fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo); !status.IsSuccess() { - if status.Code() == framework.Error { - klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, status.AsError()) - } - - return nil, 0, false + return nil, 0, status } var victims []*v1.Pod numViolatingVictim := 0 @@ -654,8 +668,7 @@ func selectVictimsOnNode( } for _, p := range violatingVictims { if fits, err := reprievePod(p); err != nil { - klog.Warningf("Failed to reprieve pod %q: %v", p.Name, err) - return nil, 0, false + return nil, 0, framework.NewStatus(framework.Error, err.Error()) } else if !fits { numViolatingVictim++ } @@ -663,11 +676,10 @@ func selectVictimsOnNode( // Now we try to reprieve non-violating victims. for _, p := range nonViolatingVictims { if _, err := reprievePod(p); err != nil { - klog.Warningf("Failed to reprieve pod %q: %v", p.Name, err) - return nil, 0, false + return nil, 0, framework.NewStatus(framework.Error, err.Error()) } } - return victims, numViolatingVictim, true + return victims, numViolatingVictim, framework.NewStatus(framework.Success) } // PrepareCandidate does some preparation work before nominating the selected candidate: diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index 32f7057b93b..ed6f2a63716 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -100,6 +100,7 @@ func getDefaultDefaultPreemptionArgs() *config.DefaultPreemptionArgs { func TestPostFilter(t *testing.T) { onePodRes := map[v1.ResourceName]string{v1.ResourcePods: "1"} + nodeRes := map[v1.ResourceName]string{v1.ResourceCPU: "200m", v1.ResourceMemory: "400"} tests := []struct { name string pod *v1.Pod @@ -138,7 +139,7 @@ func TestPostFilter(t *testing.T) { "node1": framework.NewStatus(framework.Unschedulable), }, wantResult: nil, - wantStatus: framework.NewStatus(framework.Unschedulable), + wantStatus: framework.NewStatus(framework.Unschedulable, "0/1 nodes are available: 1 No victims found on node node1 for preemptor pod p."), }, { name: "preemption should respect filteredNodesStatuses", @@ -194,6 +195,42 @@ func TestPostFilter(t *testing.T) { }, wantStatus: framework.NewStatus(framework.Success), }, + { + name: "no candidate nodes found, no enough resource after removing low priority pods", + pod: st.MakePod().Name("p").UID("p").Namespace(v1.NamespaceDefault).Priority(highPriority).Req(largeRes).Obj(), + pods: []*v1.Pod{ + st.MakePod().Name("p1").UID("p1").Namespace(v1.NamespaceDefault).Node("node1").Obj(), + st.MakePod().Name("p2").UID("p2").Namespace(v1.NamespaceDefault).Node("node2").Obj(), + }, + nodes: []*v1.Node{ + st.MakeNode().Name("node1").Capacity(nodeRes).Obj(), // no enough CPU resource + st.MakeNode().Name("node2").Capacity(nodeRes).Obj(), // no enough CPU resource + }, + filteredNodesStatuses: framework.NodeToStatusMap{ + "node1": framework.NewStatus(framework.Unschedulable), + "node2": framework.NewStatus(framework.Unschedulable), + }, + wantResult: nil, + wantStatus: framework.NewStatus(framework.Unschedulable, "0/2 nodes are available: 2 Insufficient cpu."), + }, + { + name: "no candidate nodes found with mixed reasons, no lower priority pod and no enough CPU resource", + pod: st.MakePod().Name("p").UID("p").Namespace(v1.NamespaceDefault).Priority(highPriority).Req(largeRes).Obj(), + pods: []*v1.Pod{ + st.MakePod().Name("p1").UID("p1").Namespace(v1.NamespaceDefault).Node("node1").Priority(highPriority).Obj(), + st.MakePod().Name("p2").UID("p2").Namespace(v1.NamespaceDefault).Node("node2").Obj(), + }, + nodes: []*v1.Node{ + st.MakeNode().Name("node1").Capacity(onePodRes).Obj(), // no pod will be preempted + st.MakeNode().Name("node2").Capacity(nodeRes).Obj(), // no enough CPU resource + }, + filteredNodesStatuses: framework.NodeToStatusMap{ + "node1": framework.NewStatus(framework.Unschedulable), + "node2": framework.NewStatus(framework.Unschedulable), + }, + wantResult: nil, + wantStatus: framework.NewStatus(framework.Unschedulable, "0/2 nodes are available: 1 Insufficient cpu, 1 No victims found on node node1 for preemptor pod p."), + }, } for _, tt := range tests { @@ -978,7 +1015,7 @@ func TestDryRunPreemption(t *testing.T) { t.Errorf("cycle %d: Unexpected PreFilter Status: %v", cycle, status) } offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(nodeInfos))) - got := dryRunPreemption(context.Background(), fwk, state, pod, nodeInfos, tt.pdbs, offset, numCandidates) + got, _ := dryRunPreemption(context.Background(), fwk, state, pod, nodeInfos, tt.pdbs, offset, numCandidates) if err != nil { t.Fatal(err) } @@ -1201,7 +1238,7 @@ func TestSelectBestCandidate(t *testing.T) { pl := &DefaultPreemption{args: *getDefaultDefaultPreemptionArgs()} offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(nodeInfos))) - candidates := dryRunPreemption(context.Background(), fwk, state, tt.pod, nodeInfos, nil, offset, numCandidates) + candidates, _ := dryRunPreemption(context.Background(), fwk, state, tt.pod, nodeInfos, nil, offset, numCandidates) s := SelectCandidate(candidates) found := false for _, nodeName := range tt.expected { diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index 7b7f19256d1..6f1da3a54c1 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -19,6 +19,8 @@ package framework import ( "errors" "fmt" + "sort" + "strings" "sync" "sync/atomic" "time" @@ -89,6 +91,39 @@ type WeightedAffinityTerm struct { Weight int32 } +// FitError describes a fit error of a pod. +type FitError struct { + Pod *v1.Pod + NumAllNodes int + FilteredNodesStatuses NodeToStatusMap +} + +const ( + // NoNodeAvailableMsg is used to format message when no nodes available. + NoNodeAvailableMsg = "0/%v nodes are available" +) + +// Error returns detailed information of why the pod failed to fit on each node +func (f *FitError) Error() string { + reasons := make(map[string]int) + for _, status := range f.FilteredNodesStatuses { + for _, reason := range status.Reasons() { + reasons[reason]++ + } + } + + sortReasonsHistogram := func() []string { + var reasonStrings []string + for k, v := range reasons { + reasonStrings = append(reasonStrings, fmt.Sprintf("%v %v", v, k)) + } + sort.Strings(reasonStrings) + return reasonStrings + } + reasonMsg := fmt.Sprintf(NoNodeAvailableMsg+": %v.", f.NumAllNodes, strings.Join(sortReasonsHistogram(), ", ")) + return reasonMsg +} + func newAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm) (*AffinityTerm, error) { namespaces := schedutil.GetNamespacesFromPodAffinityTerm(pod, term) selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 1e5d868fcc5..618d7da2277 100755 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -457,7 +457,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // will fit due to the preemption. It is also possible that a different pod will schedule // into the resources that were preempted, but this is harmless. nominatedNode := "" - if fitError, ok := err.(*core.FitError); ok { + if fitError, ok := err.(*framework.FitError); ok { if !fwk.HasPostFilterPlugins() { klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed") } else { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 9d39102ffae..f5a80d18f59 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -633,7 +633,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { scheduler.scheduleOne(context.Background()) select { case err := <-errChan: - expectErr := &core.FitError{ + expectErr := &framework.FitError{ Pod: secondPod, NumAllNodes: 1, FilteredNodesStatuses: framework.NodeToStatusMap{ @@ -777,7 +777,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { scheduler.scheduleOne(context.Background()) select { case err := <-errChan: - expectErr := &core.FitError{ + expectErr := &framework.FitError{ Pod: podWithTooBigResourceRequests, NumAllNodes: len(nodes), FilteredNodesStatuses: failedNodeStatues,