mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #98041 from Huang-Wei/sched-enqueue-1
Surface info of failed plugins during PerFilter and Filter
This commit is contained in:
commit
f402e472a1
@ -61,6 +61,7 @@ go_test(
|
||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
|
||||
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apiserver/pkg/util/feature"
|
||||
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
@ -106,7 +107,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework
|
||||
return result, ErrNoNodesAvailable
|
||||
}
|
||||
|
||||
feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, fwk, state, pod)
|
||||
feasibleNodes, diagnosis, err := g.findNodesThatFitPod(ctx, fwk, state, pod)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
@ -114,9 +115,9 @@ func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework
|
||||
|
||||
if len(feasibleNodes) == 0 {
|
||||
return result, &framework.FitError{
|
||||
Pod: pod,
|
||||
NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
|
||||
FilteredNodesStatuses: filteredNodesStatuses,
|
||||
Pod: pod,
|
||||
NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
|
||||
Diagnosis: diagnosis,
|
||||
}
|
||||
}
|
||||
|
||||
@ -124,7 +125,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework
|
||||
if len(feasibleNodes) == 1 {
|
||||
return ScheduleResult{
|
||||
SuggestedHost: feasibleNodes[0].Name,
|
||||
EvaluatedNodes: 1 + len(filteredNodesStatuses),
|
||||
EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
|
||||
FeasibleNodes: 1,
|
||||
}, nil
|
||||
}
|
||||
@ -139,7 +140,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework
|
||||
|
||||
return ScheduleResult{
|
||||
SuggestedHost: host,
|
||||
EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses),
|
||||
EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
|
||||
FeasibleNodes: len(feasibleNodes),
|
||||
}, err
|
||||
}
|
||||
@ -197,19 +198,19 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i
|
||||
return numNodes
|
||||
}
|
||||
|
||||
func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, filteredNodesStatuses framework.NodeToStatusMap) ([]*v1.Node, error) {
|
||||
func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*v1.Node, error) {
|
||||
nnn := pod.Status.NominatedNodeName
|
||||
nodeInfo, err := g.nodeInfoSnapshot.Get(nnn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
node := []*framework.NodeInfo{nodeInfo}
|
||||
feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, filteredNodesStatuses, node)
|
||||
feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, filteredNodesStatuses)
|
||||
feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, diagnosis.NodeToStatusMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -219,48 +220,53 @@ func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Po
|
||||
|
||||
// Filters the nodes to find the ones that fit the pod based on the framework
|
||||
// filter plugins and filter extenders.
|
||||
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
|
||||
filteredNodesStatuses := make(framework.NodeToStatusMap)
|
||||
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
|
||||
diagnosis := framework.Diagnosis{
|
||||
NodeToStatusMap: make(framework.NodeToStatusMap),
|
||||
UnschedulablePlugins: sets.NewString(),
|
||||
}
|
||||
|
||||
// Run "prefilter" plugins.
|
||||
s := fwk.RunPreFilterPlugins(ctx, state, pod)
|
||||
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, diagnosis, err
|
||||
}
|
||||
if !s.IsSuccess() {
|
||||
if !s.IsUnschedulable() {
|
||||
return nil, nil, s.AsError()
|
||||
return nil, diagnosis, s.AsError()
|
||||
}
|
||||
// All nodes will have the same status. Some non trivial refactoring is
|
||||
// needed to avoid this copy.
|
||||
for _, n := range allNodes {
|
||||
filteredNodesStatuses[n.Node().Name] = s
|
||||
diagnosis.NodeToStatusMap[n.Node().Name] = s
|
||||
}
|
||||
return nil, filteredNodesStatuses, nil
|
||||
diagnosis.UnschedulablePlugins.Insert(s.FailedPlugin())
|
||||
return nil, diagnosis, nil
|
||||
}
|
||||
|
||||
// "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption.
|
||||
// This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes.
|
||||
if len(pod.Status.NominatedNodeName) > 0 && feature.DefaultFeatureGate.Enabled(features.PreferNominatedNode) {
|
||||
feasibleNodes, err := g.evaluateNominatedNode(ctx, pod, fwk, state, filteredNodesStatuses)
|
||||
feasibleNodes, err := g.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
|
||||
}
|
||||
// Nominated node passes all the filters, scheduler is good to assign this node to the pod.
|
||||
if len(feasibleNodes) != 0 {
|
||||
return feasibleNodes, filteredNodesStatuses, nil
|
||||
return feasibleNodes, diagnosis, nil
|
||||
}
|
||||
}
|
||||
feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, filteredNodesStatuses, allNodes)
|
||||
feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, diagnosis, err
|
||||
}
|
||||
feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, filteredNodesStatuses)
|
||||
|
||||
feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, diagnosis.NodeToStatusMap)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, diagnosis, err
|
||||
}
|
||||
return feasibleNodes, filteredNodesStatuses, nil
|
||||
return feasibleNodes, diagnosis, nil
|
||||
}
|
||||
|
||||
// findNodesThatPassFilters finds the nodes that fit the filter plugins.
|
||||
@ -269,7 +275,7 @@ func (g *genericScheduler) findNodesThatPassFilters(
|
||||
fwk framework.Framework,
|
||||
state *framework.CycleState,
|
||||
pod *v1.Pod,
|
||||
statuses framework.NodeToStatusMap,
|
||||
diagnosis framework.Diagnosis,
|
||||
nodes []*framework.NodeInfo) ([]*v1.Node, error) {
|
||||
numNodesToFind := g.numFeasibleNodesToFind(int32(len(nodes)))
|
||||
|
||||
@ -309,7 +315,8 @@ func (g *genericScheduler) findNodesThatPassFilters(
|
||||
}
|
||||
} else {
|
||||
statusesLock.Lock()
|
||||
statuses[nodeInfo.Node().Name] = status
|
||||
diagnosis.NodeToStatusMap[nodeInfo.Node().Name] = status
|
||||
diagnosis.UnschedulablePlugins.Insert(status.FailedPlugin())
|
||||
statusesLock.Unlock()
|
||||
}
|
||||
}
|
||||
@ -326,7 +333,7 @@ func (g *genericScheduler) findNodesThatPassFilters(
|
||||
// Stops searching for more nodes once the configured number of feasible nodes
|
||||
// are found.
|
||||
parallelize.Until(ctx, len(nodes), checkNode)
|
||||
processedNodes := int(feasibleNodesLen) + len(statuses)
|
||||
processedNodes := int(feasibleNodesLen) + len(diagnosis.NodeToStatusMap)
|
||||
g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(nodes)
|
||||
|
||||
feasibleNodes = feasibleNodes[:feasibleNodesLen]
|
||||
|
@ -21,11 +21,12 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -295,9 +296,12 @@ func TestGenericScheduler(t *testing.T) {
|
||||
wErr: &framework.FitError{
|
||||
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
|
||||
NumAllNodes: 2,
|
||||
FilteredNodesStatuses: framework.NodeToStatusMap{
|
||||
"machine1": framework.NewStatus(framework.Unschedulable, st.ErrReasonFake),
|
||||
"machine2": framework.NewStatus(framework.Unschedulable, st.ErrReasonFake),
|
||||
Diagnosis: framework.Diagnosis{
|
||||
NodeToStatusMap: framework.NodeToStatusMap{
|
||||
"machine1": framework.NewStatus(framework.Unschedulable, st.ErrReasonFake).WithFailedPlugin("FalseFilter"),
|
||||
"machine2": framework.NewStatus(framework.Unschedulable, st.ErrReasonFake).WithFailedPlugin("FalseFilter"),
|
||||
},
|
||||
UnschedulablePlugins: sets.NewString("FalseFilter"),
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -380,10 +384,13 @@ func TestGenericScheduler(t *testing.T) {
|
||||
wErr: &framework.FitError{
|
||||
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
|
||||
NumAllNodes: 3,
|
||||
FilteredNodesStatuses: framework.NodeToStatusMap{
|
||||
"3": framework.NewStatus(framework.Unschedulable, st.ErrReasonFake),
|
||||
"2": framework.NewStatus(framework.Unschedulable, st.ErrReasonFake),
|
||||
"1": framework.NewStatus(framework.Unschedulable, st.ErrReasonFake),
|
||||
Diagnosis: framework.Diagnosis{
|
||||
NodeToStatusMap: framework.NodeToStatusMap{
|
||||
"3": framework.NewStatus(framework.Unschedulable, st.ErrReasonFake).WithFailedPlugin("FalseFilter"),
|
||||
"2": framework.NewStatus(framework.Unschedulable, st.ErrReasonFake).WithFailedPlugin("FalseFilter"),
|
||||
"1": framework.NewStatus(framework.Unschedulable, st.ErrReasonFake).WithFailedPlugin("FalseFilter"),
|
||||
},
|
||||
UnschedulablePlugins: sets.NewString("FalseFilter"),
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -412,9 +419,12 @@ func TestGenericScheduler(t *testing.T) {
|
||||
wErr: &framework.FitError{
|
||||
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
|
||||
NumAllNodes: 2,
|
||||
FilteredNodesStatuses: framework.NodeToStatusMap{
|
||||
"1": framework.NewStatus(framework.Unschedulable, st.ErrReasonFake),
|
||||
"2": framework.NewStatus(framework.Unschedulable, st.ErrReasonFake),
|
||||
Diagnosis: framework.Diagnosis{
|
||||
NodeToStatusMap: framework.NodeToStatusMap{
|
||||
"1": framework.NewStatus(framework.Unschedulable, st.ErrReasonFake).WithFailedPlugin("MatchFilter"),
|
||||
"2": framework.NewStatus(framework.Unschedulable, st.ErrReasonFake).WithFailedPlugin("NoPodsFilter"),
|
||||
},
|
||||
UnschedulablePlugins: sets.NewString("MatchFilter", "NoPodsFilter"),
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -475,7 +485,30 @@ func TestGenericScheduler(t *testing.T) {
|
||||
},
|
||||
},
|
||||
name: "unknown PVC",
|
||||
wErr: fmt.Errorf("persistentvolumeclaim \"unknownPVC\" not found"),
|
||||
wErr: &framework.FitError{
|
||||
Pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore")},
|
||||
Spec: v1.PodSpec{
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
VolumeSource: v1.VolumeSource{
|
||||
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||
ClaimName: "unknownPVC",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
NumAllNodes: 2,
|
||||
Diagnosis: framework.Diagnosis{
|
||||
NodeToStatusMap: framework.NodeToStatusMap{
|
||||
"machine1": framework.NewStatus(framework.UnschedulableAndUnresolvable, `persistentvolumeclaim "unknownPVC" not found`).WithFailedPlugin(volumebinding.Name),
|
||||
"machine2": framework.NewStatus(framework.UnschedulableAndUnresolvable, `persistentvolumeclaim "unknownPVC" not found`).WithFailedPlugin(volumebinding.Name),
|
||||
},
|
||||
UnschedulablePlugins: sets.NewString(volumebinding.Name),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Pod with deleting PVC
|
||||
@ -502,7 +535,30 @@ func TestGenericScheduler(t *testing.T) {
|
||||
},
|
||||
},
|
||||
name: "deleted PVC",
|
||||
wErr: fmt.Errorf("persistentvolumeclaim \"existingPVC\" is being deleted"),
|
||||
wErr: &framework.FitError{
|
||||
Pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore"), Namespace: v1.NamespaceDefault},
|
||||
Spec: v1.PodSpec{
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
VolumeSource: v1.VolumeSource{
|
||||
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||
ClaimName: "existingPVC",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
NumAllNodes: 2,
|
||||
Diagnosis: framework.Diagnosis{
|
||||
NodeToStatusMap: framework.NodeToStatusMap{
|
||||
"machine1": framework.NewStatus(framework.UnschedulableAndUnresolvable, `persistentvolumeclaim "existingPVC" is being deleted`).WithFailedPlugin(volumebinding.Name),
|
||||
"machine2": framework.NewStatus(framework.UnschedulableAndUnresolvable, `persistentvolumeclaim "existingPVC" is being deleted`).WithFailedPlugin(volumebinding.Name),
|
||||
},
|
||||
UnschedulablePlugins: sets.NewString(volumebinding.Name),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
@ -646,8 +702,11 @@ func TestGenericScheduler(t *testing.T) {
|
||||
wErr: &framework.FitError{
|
||||
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
|
||||
NumAllNodes: 1,
|
||||
FilteredNodesStatuses: framework.NodeToStatusMap{
|
||||
"3": framework.NewStatus(framework.Unschedulable, "injecting failure for pod test-filter"),
|
||||
Diagnosis: framework.Diagnosis{
|
||||
NodeToStatusMap: framework.NodeToStatusMap{
|
||||
"3": framework.NewStatus(framework.Unschedulable, "injecting failure for pod test-filter").WithFailedPlugin("FakeFilter"),
|
||||
},
|
||||
UnschedulablePlugins: sets.NewString("FakeFilter"),
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -668,8 +727,11 @@ func TestGenericScheduler(t *testing.T) {
|
||||
wErr: &framework.FitError{
|
||||
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
|
||||
NumAllNodes: 1,
|
||||
FilteredNodesStatuses: framework.NodeToStatusMap{
|
||||
"3": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injecting failure for pod test-filter"),
|
||||
Diagnosis: framework.Diagnosis{
|
||||
NodeToStatusMap: framework.NodeToStatusMap{
|
||||
"3": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injecting failure for pod test-filter").WithFailedPlugin("FakeFilter"),
|
||||
},
|
||||
UnschedulablePlugins: sets.NewString("FakeFilter"),
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -705,9 +767,12 @@ func TestGenericScheduler(t *testing.T) {
|
||||
wErr: &framework.FitError{
|
||||
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
|
||||
NumAllNodes: 2,
|
||||
FilteredNodesStatuses: framework.NodeToStatusMap{
|
||||
"1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status"),
|
||||
"2": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status"),
|
||||
Diagnosis: framework.Diagnosis{
|
||||
NodeToStatusMap: framework.NodeToStatusMap{
|
||||
"1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status").WithFailedPlugin("FakePreFilter"),
|
||||
"2": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status").WithFailedPlugin("FakePreFilter"),
|
||||
},
|
||||
UnschedulablePlugins: sets.NewString("FakePreFilter"),
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -772,8 +837,15 @@ func TestGenericScheduler(t *testing.T) {
|
||||
informerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
result, err := scheduler.Schedule(ctx, fwk, framework.NewCycleState(), test.pod)
|
||||
if err != test.wErr && !strings.Contains(err.Error(), test.wErr.Error()) {
|
||||
t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr)
|
||||
// TODO(#94696): replace reflect.DeepEqual with cmp.Diff().
|
||||
if err != test.wErr {
|
||||
gotFitErr, gotOK := err.(*framework.FitError)
|
||||
wantFitErr, wantOK := test.wErr.(*framework.FitError)
|
||||
if gotOK != wantOK {
|
||||
t.Errorf("Expected err to be FitError: %v, but got %v", wantOK, gotOK)
|
||||
} else if gotOK && !reflect.DeepEqual(gotFitErr, wantFitErr) {
|
||||
t.Errorf("Unexpected fitError. Want %v, but got %v", wantFitErr, gotFitErr)
|
||||
}
|
||||
}
|
||||
if test.expectedHosts != nil && !test.expectedHosts.Has(result.SuggestedHost) {
|
||||
t.Errorf("Expected: %s, got: %s", test.expectedHosts, result.SuggestedHost)
|
||||
@ -817,27 +889,19 @@ func TestFindFitAllError(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{})
|
||||
_, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{})
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if len(nodeToStatusMap) != len(nodes) {
|
||||
t.Errorf("unexpected failed status map: %v", nodeToStatusMap)
|
||||
// TODO(#94696): use cmp.Diff() to compare `diagnosis`.
|
||||
if len(diagnosis.NodeToStatusMap) != len(nodes) {
|
||||
t.Errorf("unexpected failed status map: %v", diagnosis.NodeToStatusMap)
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
t.Run(node.Name, func(t *testing.T) {
|
||||
status, found := nodeToStatusMap[node.Name]
|
||||
if !found {
|
||||
t.Errorf("failed to find node %v in %v", node.Name, nodeToStatusMap)
|
||||
}
|
||||
reasons := status.Reasons()
|
||||
if len(reasons) != 1 || reasons[0] != st.ErrReasonFake {
|
||||
t.Errorf("unexpected failure reasons: %v", reasons)
|
||||
}
|
||||
})
|
||||
if diff := cmp.Diff(sets.NewString("MatchFilter"), diagnosis.UnschedulablePlugins); diff != "" {
|
||||
t.Errorf("Unexpected unschedulablePlugins: (-want, +got): %s", diagnosis.UnschedulablePlugins)
|
||||
}
|
||||
}
|
||||
|
||||
@ -858,14 +922,18 @@ func TestFindFitSomeError(t *testing.T) {
|
||||
}
|
||||
|
||||
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
|
||||
_, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), pod)
|
||||
_, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), pod)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if len(nodeToStatusMap) != len(nodes)-1 {
|
||||
t.Errorf("unexpected failed status map: %v", nodeToStatusMap)
|
||||
if len(diagnosis.NodeToStatusMap) != len(nodes)-1 {
|
||||
t.Errorf("unexpected failed status map: %v", diagnosis.NodeToStatusMap)
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(sets.NewString("MatchFilter"), diagnosis.UnschedulablePlugins); diff != "" {
|
||||
t.Errorf("Unexpected unschedulablePlugins: (-want, +got): %s", diagnosis.UnschedulablePlugins)
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
@ -873,9 +941,9 @@ func TestFindFitSomeError(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
t.Run(node.Name, func(t *testing.T) {
|
||||
status, found := nodeToStatusMap[node.Name]
|
||||
status, found := diagnosis.NodeToStatusMap[node.Name]
|
||||
if !found {
|
||||
t.Errorf("failed to find node %v in %v", node.Name, nodeToStatusMap)
|
||||
t.Errorf("failed to find node %v in %v", node.Name, diagnosis.NodeToStatusMap)
|
||||
}
|
||||
reasons := status.Reasons()
|
||||
if len(reasons) != 1 || reasons[0] != st.ErrReasonFake {
|
||||
|
@ -102,13 +102,16 @@ const (
|
||||
)
|
||||
|
||||
// Status indicates the result of running a plugin. It consists of a code, a
|
||||
// message and (optionally) an error. When the status code is not `Success`,
|
||||
// the reasons should explain why.
|
||||
// message, (optionally) an error and an plugin name it fails by. When the status
|
||||
// code is not `Success`, the reasons should explain why.
|
||||
// NOTE: A nil Status is also considered as Success.
|
||||
type Status struct {
|
||||
code Code
|
||||
reasons []string
|
||||
err error
|
||||
// failedPlugin is an optional field that records the plugin name a Pod failed by.
|
||||
// It's set by the framework when code is Error, Unschedulable or UnschedulableAndUnresolvable.
|
||||
failedPlugin string
|
||||
}
|
||||
|
||||
// Code returns code of the Status.
|
||||
@ -127,6 +130,23 @@ func (s *Status) Message() string {
|
||||
return strings.Join(s.reasons, ", ")
|
||||
}
|
||||
|
||||
// SetFailedPlugin sets the given plugin name to s.failedPlugin.
|
||||
func (s *Status) SetFailedPlugin(plugin string) {
|
||||
s.failedPlugin = plugin
|
||||
}
|
||||
|
||||
// WithFailedPlugin sets the given plugin name to s.failedPlugin,
|
||||
// and returns the given status object.
|
||||
func (s *Status) WithFailedPlugin(plugin string) *Status {
|
||||
s.SetFailedPlugin(plugin)
|
||||
return s
|
||||
}
|
||||
|
||||
// FailedPlugin returns the failed plugin name.
|
||||
func (s *Status) FailedPlugin() string {
|
||||
return s.failedPlugin
|
||||
}
|
||||
|
||||
// Reasons returns reasons of the Status.
|
||||
func (s *Status) Reasons() []string {
|
||||
return s.reasons
|
||||
@ -199,6 +219,8 @@ func (p PluginToStatus) Merge() *Status {
|
||||
}
|
||||
if statusPrecedence[s.Code()] > statusPrecedence[finalStatus.code] {
|
||||
finalStatus.code = s.Code()
|
||||
// Same as code, we keep the most relevant failedPlugin in the returned Status.
|
||||
finalStatus.failedPlugin = s.FailedPlugin()
|
||||
}
|
||||
|
||||
for _, r := range s.reasons {
|
||||
@ -220,7 +242,7 @@ type WaitingPod interface {
|
||||
// to unblock the pod.
|
||||
Allow(pluginName string)
|
||||
// Reject declares the waiting pod unschedulable.
|
||||
Reject(msg string)
|
||||
Reject(pluginName, msg string)
|
||||
}
|
||||
|
||||
// Plugin is the parent type for all the scheduling framework plugins.
|
||||
|
@ -156,7 +156,7 @@ func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.Cycle
|
||||
}
|
||||
|
||||
// 5) Perform preparation work before nominating the selected candidate.
|
||||
if err := PrepareCandidate(bestCandidate, pl.fh, cs, pod); err != nil {
|
||||
if err := PrepareCandidate(bestCandidate, pl.fh, cs, pod, pl.Name()); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
@ -223,9 +223,12 @@ func (pl *DefaultPreemption) FindCandidates(ctx context.Context, state *framewor
|
||||
// Return a FitError only when there are no candidates that fit the pod.
|
||||
if len(candidates) == 0 {
|
||||
return candidates, &framework.FitError{
|
||||
Pod: pod,
|
||||
NumAllNodes: len(potentialNodes),
|
||||
FilteredNodesStatuses: nodeStatuses,
|
||||
Pod: pod,
|
||||
NumAllNodes: len(potentialNodes),
|
||||
Diagnosis: framework.Diagnosis{
|
||||
NodeToStatusMap: nodeStatuses,
|
||||
// Leave FailedPlugins as nil as it won't be used on moving Pods.
|
||||
},
|
||||
}
|
||||
}
|
||||
return candidates, nil
|
||||
@ -687,7 +690,7 @@ func selectVictimsOnNode(
|
||||
// - Evict the victim pods
|
||||
// - Reject the victim pods if they are in waitingPod map
|
||||
// - Clear the low-priority pods' nominatedNodeName status if needed
|
||||
func PrepareCandidate(c Candidate, fh framework.Handle, cs kubernetes.Interface, pod *v1.Pod) error {
|
||||
func PrepareCandidate(c Candidate, fh framework.Handle, cs kubernetes.Interface, pod *v1.Pod, pluginName string) error {
|
||||
for _, victim := range c.Victims().Pods {
|
||||
if err := util.DeletePod(cs, victim); err != nil {
|
||||
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
|
||||
@ -695,7 +698,7 @@ func PrepareCandidate(c Candidate, fh framework.Handle, cs kubernetes.Interface,
|
||||
}
|
||||
// If the victim is a WaitingPod, send a reject message to the PermitPlugin
|
||||
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
|
||||
waitingPod.Reject("preempted")
|
||||
waitingPod.Reject(pluginName, "preempted")
|
||||
}
|
||||
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v",
|
||||
pod.Namespace, pod.Name, c.Name())
|
||||
|
@ -431,12 +431,13 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
|
||||
for _, pl := range f.preFilterPlugins {
|
||||
status = f.runPreFilterPlugin(ctx, pl, state, pod)
|
||||
if !status.IsSuccess() {
|
||||
status.SetFailedPlugin(pl.Name())
|
||||
if status.IsUnschedulable() {
|
||||
return status
|
||||
}
|
||||
err := status.AsError()
|
||||
klog.ErrorS(err, "Failed running PreFilter plugin", "plugin", pl.Name(), "pod", klog.KObj(pod))
|
||||
return framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), err))
|
||||
return framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), err)).WithFailedPlugin(pl.Name())
|
||||
}
|
||||
}
|
||||
|
||||
@ -540,9 +541,10 @@ func (f *frameworkImpl) RunFilterPlugins(
|
||||
if !pluginStatus.IsUnschedulable() {
|
||||
// Filter plugins are not supposed to return any status other than
|
||||
// Success or Unschedulable.
|
||||
errStatus := framework.AsStatus(fmt.Errorf("running %q filter plugin: %w", pl.Name(), pluginStatus.AsError()))
|
||||
errStatus := framework.AsStatus(fmt.Errorf("running %q filter plugin: %w", pl.Name(), pluginStatus.AsError())).WithFailedPlugin(pl.Name())
|
||||
return map[string]*framework.Status{pl.Name(): errStatus}
|
||||
}
|
||||
pluginStatus.SetFailedPlugin(pl.Name())
|
||||
statuses[pl.Name()] = pluginStatus
|
||||
if !f.runAllFilters {
|
||||
// Exit early if we don't need to run all filters.
|
||||
@ -975,7 +977,8 @@ func (f *frameworkImpl) RunPermitPlugins(ctx context.Context, state *framework.C
|
||||
if status.IsUnschedulable() {
|
||||
msg := fmt.Sprintf("rejected pod %q by permit plugin %q: %v", pod.Name, pl.Name(), status.Message())
|
||||
klog.V(4).Infof(msg)
|
||||
return framework.NewStatus(status.Code(), msg)
|
||||
status.SetFailedPlugin(pl.Name())
|
||||
return status
|
||||
}
|
||||
if status.Code() == framework.Wait {
|
||||
// Not allowed to be greater than maxTimeout.
|
||||
@ -987,7 +990,7 @@ func (f *frameworkImpl) RunPermitPlugins(ctx context.Context, state *framework.C
|
||||
} else {
|
||||
err := status.AsError()
|
||||
klog.ErrorS(err, "Failed running Permit plugin", "plugin", pl.Name(), "pod", klog.KObj(pod))
|
||||
return framework.AsStatus(fmt.Errorf("running Permit plugin %q: %w", pl.Name(), err))
|
||||
return framework.AsStatus(fmt.Errorf("running Permit plugin %q: %w", pl.Name(), err)).WithFailedPlugin(pl.Name())
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1012,7 +1015,7 @@ func (f *frameworkImpl) runPermitPlugin(ctx context.Context, pl framework.Permit
|
||||
}
|
||||
|
||||
// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
|
||||
func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) (status *framework.Status) {
|
||||
func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) *framework.Status {
|
||||
waitingPod := f.waitingPods.get(pod.UID)
|
||||
if waitingPod == nil {
|
||||
return nil
|
||||
@ -1028,11 +1031,12 @@ func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) (status *
|
||||
if s.IsUnschedulable() {
|
||||
msg := fmt.Sprintf("pod %q rejected while waiting on permit: %v", pod.Name, s.Message())
|
||||
klog.V(4).Infof(msg)
|
||||
return framework.NewStatus(s.Code(), msg)
|
||||
s.SetFailedPlugin(s.FailedPlugin())
|
||||
return s
|
||||
}
|
||||
err := s.AsError()
|
||||
klog.ErrorS(err, "Failed waiting on permit for pod", "pod", klog.KObj(pod))
|
||||
return framework.AsStatus(fmt.Errorf("waiting on permit for pod: %w", err))
|
||||
return framework.AsStatus(fmt.Errorf("waiting on permit for pod: %w", err)).WithFailedPlugin(s.FailedPlugin())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -1062,7 +1066,7 @@ func (f *frameworkImpl) GetWaitingPod(uid types.UID) framework.WaitingPod {
|
||||
func (f *frameworkImpl) RejectWaitingPod(uid types.UID) {
|
||||
waitingPod := f.waitingPods.get(uid)
|
||||
if waitingPod != nil {
|
||||
waitingPod.Reject("removed")
|
||||
waitingPod.Reject("", "removed")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -28,7 +28,6 @@ import (
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
@ -286,7 +285,7 @@ func (pp *TestPermitPlugin) Name() string {
|
||||
return permitPlugin
|
||||
}
|
||||
func (pp *TestPermitPlugin) Permit(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
|
||||
return framework.NewStatus(framework.Wait, ""), time.Duration(10 * time.Second)
|
||||
return framework.NewStatus(framework.Wait), 10 * time.Second
|
||||
}
|
||||
|
||||
var _ framework.QueueSortPlugin = &TestQueueSortPlugin{}
|
||||
@ -894,8 +893,10 @@ func TestFilterPlugins(t *testing.T) {
|
||||
inj: injectedResult{FilterStatus: int(framework.Error)},
|
||||
},
|
||||
},
|
||||
wantStatus: framework.AsStatus(fmt.Errorf(`running "TestPlugin" filter plugin: %w`, errInjectedFilterStatus)),
|
||||
wantStatusMap: framework.PluginToStatus{"TestPlugin": framework.AsStatus(fmt.Errorf(`running "TestPlugin" filter plugin: %w`, errInjectedFilterStatus))},
|
||||
wantStatus: framework.AsStatus(fmt.Errorf(`running "TestPlugin" filter plugin: %w`, errInjectedFilterStatus)).WithFailedPlugin("TestPlugin"),
|
||||
wantStatusMap: framework.PluginToStatus{
|
||||
"TestPlugin": framework.AsStatus(fmt.Errorf(`running "TestPlugin" filter plugin: %w`, errInjectedFilterStatus)).WithFailedPlugin("TestPlugin"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "UnschedulableFilter",
|
||||
@ -905,8 +906,10 @@ func TestFilterPlugins(t *testing.T) {
|
||||
inj: injectedResult{FilterStatus: int(framework.Unschedulable)},
|
||||
},
|
||||
},
|
||||
wantStatus: framework.NewStatus(framework.Unschedulable, "injected filter status"),
|
||||
wantStatusMap: framework.PluginToStatus{"TestPlugin": framework.NewStatus(framework.Unschedulable, "injected filter status")},
|
||||
wantStatus: framework.NewStatus(framework.Unschedulable, "injected filter status").WithFailedPlugin("TestPlugin"),
|
||||
wantStatusMap: framework.PluginToStatus{
|
||||
"TestPlugin": framework.NewStatus(framework.Unschedulable, "injected filter status").WithFailedPlugin("TestPlugin"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "UnschedulableAndUnresolvableFilter",
|
||||
@ -917,8 +920,10 @@ func TestFilterPlugins(t *testing.T) {
|
||||
FilterStatus: int(framework.UnschedulableAndUnresolvable)},
|
||||
},
|
||||
},
|
||||
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected filter status"),
|
||||
wantStatusMap: framework.PluginToStatus{"TestPlugin": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected filter status")},
|
||||
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected filter status").WithFailedPlugin("TestPlugin"),
|
||||
wantStatusMap: framework.PluginToStatus{
|
||||
"TestPlugin": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected filter status").WithFailedPlugin("TestPlugin"),
|
||||
},
|
||||
},
|
||||
// followings tests cover multiple-plugins scenarios
|
||||
{
|
||||
@ -934,8 +939,10 @@ func TestFilterPlugins(t *testing.T) {
|
||||
inj: injectedResult{FilterStatus: int(framework.Error)},
|
||||
},
|
||||
},
|
||||
wantStatus: framework.AsStatus(fmt.Errorf(`running "TestPlugin1" filter plugin: %w`, errInjectedFilterStatus)),
|
||||
wantStatusMap: framework.PluginToStatus{"TestPlugin1": framework.AsStatus(fmt.Errorf(`running "TestPlugin1" filter plugin: %w`, errInjectedFilterStatus))},
|
||||
wantStatus: framework.AsStatus(fmt.Errorf(`running "TestPlugin1" filter plugin: %w`, errInjectedFilterStatus)).WithFailedPlugin("TestPlugin1"),
|
||||
wantStatusMap: framework.PluginToStatus{
|
||||
"TestPlugin1": framework.AsStatus(fmt.Errorf(`running "TestPlugin1" filter plugin: %w`, errInjectedFilterStatus)).WithFailedPlugin("TestPlugin1"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "SuccessAndSuccessFilters",
|
||||
@ -965,8 +972,10 @@ func TestFilterPlugins(t *testing.T) {
|
||||
inj: injectedResult{FilterStatus: int(framework.Success)},
|
||||
},
|
||||
},
|
||||
wantStatus: framework.AsStatus(fmt.Errorf(`running "TestPlugin1" filter plugin: %w`, errInjectedFilterStatus)),
|
||||
wantStatusMap: framework.PluginToStatus{"TestPlugin1": framework.AsStatus(fmt.Errorf(`running "TestPlugin1" filter plugin: %w`, errInjectedFilterStatus))},
|
||||
wantStatus: framework.AsStatus(fmt.Errorf(`running "TestPlugin1" filter plugin: %w`, errInjectedFilterStatus)).WithFailedPlugin("TestPlugin1"),
|
||||
wantStatusMap: framework.PluginToStatus{
|
||||
"TestPlugin1": framework.AsStatus(fmt.Errorf(`running "TestPlugin1" filter plugin: %w`, errInjectedFilterStatus)).WithFailedPlugin("TestPlugin1"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "SuccessAndErrorFilters",
|
||||
@ -981,8 +990,10 @@ func TestFilterPlugins(t *testing.T) {
|
||||
inj: injectedResult{FilterStatus: int(framework.Error)},
|
||||
},
|
||||
},
|
||||
wantStatus: framework.AsStatus(fmt.Errorf(`running "TestPlugin2" filter plugin: %w`, errInjectedFilterStatus)),
|
||||
wantStatusMap: framework.PluginToStatus{"TestPlugin2": framework.AsStatus(fmt.Errorf(`running "TestPlugin2" filter plugin: %w`, errInjectedFilterStatus))},
|
||||
wantStatus: framework.AsStatus(fmt.Errorf(`running "TestPlugin2" filter plugin: %w`, errInjectedFilterStatus)).WithFailedPlugin("TestPlugin2"),
|
||||
wantStatusMap: framework.PluginToStatus{
|
||||
"TestPlugin2": framework.AsStatus(fmt.Errorf(`running "TestPlugin2" filter plugin: %w`, errInjectedFilterStatus)).WithFailedPlugin("TestPlugin2"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "SuccessAndUnschedulableFilters",
|
||||
@ -997,8 +1008,10 @@ func TestFilterPlugins(t *testing.T) {
|
||||
inj: injectedResult{FilterStatus: int(framework.Unschedulable)},
|
||||
},
|
||||
},
|
||||
wantStatus: framework.NewStatus(framework.Unschedulable, "injected filter status"),
|
||||
wantStatusMap: framework.PluginToStatus{"TestPlugin2": framework.NewStatus(framework.Unschedulable, "injected filter status")},
|
||||
wantStatus: framework.NewStatus(framework.Unschedulable, "injected filter status").WithFailedPlugin("TestPlugin2"),
|
||||
wantStatusMap: framework.PluginToStatus{
|
||||
"TestPlugin2": framework.NewStatus(framework.Unschedulable, "injected filter status").WithFailedPlugin("TestPlugin2"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "SuccessFilterWithRunAllFilters",
|
||||
@ -1026,8 +1039,10 @@ func TestFilterPlugins(t *testing.T) {
|
||||
},
|
||||
},
|
||||
runAllFilters: true,
|
||||
wantStatus: framework.AsStatus(fmt.Errorf(`running "TestPlugin1" filter plugin: %w`, errInjectedFilterStatus)),
|
||||
wantStatusMap: framework.PluginToStatus{"TestPlugin1": framework.AsStatus(fmt.Errorf(`running "TestPlugin1" filter plugin: %w`, errInjectedFilterStatus))},
|
||||
wantStatus: framework.AsStatus(fmt.Errorf(`running "TestPlugin1" filter plugin: %w`, errInjectedFilterStatus)).WithFailedPlugin("TestPlugin1"),
|
||||
wantStatusMap: framework.PluginToStatus{
|
||||
"TestPlugin1": framework.AsStatus(fmt.Errorf(`running "TestPlugin1" filter plugin: %w`, errInjectedFilterStatus)).WithFailedPlugin("TestPlugin1"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ErrorAndErrorFilters",
|
||||
@ -1042,10 +1057,10 @@ func TestFilterPlugins(t *testing.T) {
|
||||
},
|
||||
},
|
||||
runAllFilters: true,
|
||||
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected filter status", "injected filter status"),
|
||||
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected filter status", "injected filter status").WithFailedPlugin("TestPlugin1"),
|
||||
wantStatusMap: framework.PluginToStatus{
|
||||
"TestPlugin1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected filter status"),
|
||||
"TestPlugin2": framework.NewStatus(framework.Unschedulable, "injected filter status"),
|
||||
"TestPlugin1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected filter status").WithFailedPlugin("TestPlugin1"),
|
||||
"TestPlugin2": framework.NewStatus(framework.Unschedulable, "injected filter status").WithFailedPlugin("TestPlugin2"),
|
||||
},
|
||||
},
|
||||
}
|
||||
@ -1081,7 +1096,6 @@ func TestFilterPlugins(t *testing.T) {
|
||||
if !reflect.DeepEqual(gotStatusMap, tt.wantStatusMap) {
|
||||
t.Errorf("wrong status map. got: %+v, want: %+v", gotStatusMap, tt.wantStatusMap)
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -1238,7 +1252,7 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) {
|
||||
nominatedPod: highPriorityPod,
|
||||
node: node,
|
||||
nodeInfo: framework.NewNodeInfo(pod),
|
||||
wantStatus: framework.AsStatus(fmt.Errorf(`running "TestPlugin2" filter plugin: %w`, errInjectedFilterStatus)),
|
||||
wantStatus: framework.AsStatus(fmt.Errorf(`running "TestPlugin2" filter plugin: %w`, errInjectedFilterStatus)).WithFailedPlugin("TestPlugin2"),
|
||||
},
|
||||
{
|
||||
name: "node has a low-priority nominated pod and pre filters return unschedulable",
|
||||
@ -1653,7 +1667,7 @@ func TestPermitPlugins(t *testing.T) {
|
||||
inj: injectedResult{PermitStatus: int(framework.Unschedulable)},
|
||||
},
|
||||
},
|
||||
want: framework.NewStatus(framework.Unschedulable, `rejected pod "" by permit plugin "TestPlugin": injected status`),
|
||||
want: framework.NewStatus(framework.Unschedulable, "injected status").WithFailedPlugin("TestPlugin"),
|
||||
},
|
||||
{
|
||||
name: "ErrorPermitPlugin",
|
||||
@ -1663,7 +1677,7 @@ func TestPermitPlugins(t *testing.T) {
|
||||
inj: injectedResult{PermitStatus: int(framework.Error)},
|
||||
},
|
||||
},
|
||||
want: framework.AsStatus(fmt.Errorf(`running Permit plugin "TestPlugin": %w`, errInjectedStatus)),
|
||||
want: framework.AsStatus(fmt.Errorf(`running Permit plugin "TestPlugin": %w`, errInjectedStatus)).WithFailedPlugin("TestPlugin"),
|
||||
},
|
||||
{
|
||||
name: "UnschedulableAndUnresolvablePermitPlugin",
|
||||
@ -1673,7 +1687,7 @@ func TestPermitPlugins(t *testing.T) {
|
||||
inj: injectedResult{PermitStatus: int(framework.UnschedulableAndUnresolvable)},
|
||||
},
|
||||
},
|
||||
want: framework.NewStatus(framework.UnschedulableAndUnresolvable, `rejected pod "" by permit plugin "TestPlugin": injected status`),
|
||||
want: framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected status").WithFailedPlugin("TestPlugin"),
|
||||
},
|
||||
{
|
||||
name: "WaitPermitPlugin",
|
||||
@ -1711,38 +1725,40 @@ func TestPermitPlugins(t *testing.T) {
|
||||
inj: injectedResult{PermitStatus: int(framework.Error)},
|
||||
},
|
||||
},
|
||||
want: framework.AsStatus(fmt.Errorf(`running Permit plugin "TestPlugin": %w`, errInjectedStatus)),
|
||||
want: framework.AsStatus(fmt.Errorf(`running Permit plugin "TestPlugin": %w`, errInjectedStatus)).WithFailedPlugin("TestPlugin"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
registry := Registry{}
|
||||
configPlugins := &config.Plugins{Permit: &config.PluginSet{}}
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
registry := Registry{}
|
||||
configPlugins := &config.Plugins{Permit: &config.PluginSet{}}
|
||||
|
||||
for _, pl := range tt.plugins {
|
||||
tmpPl := pl
|
||||
if err := registry.Register(pl.name, func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
|
||||
return tmpPl, nil
|
||||
}); err != nil {
|
||||
t.Fatalf("Unable to register Permit plugin: %s", pl.name)
|
||||
for _, pl := range tt.plugins {
|
||||
tmpPl := pl
|
||||
if err := registry.Register(pl.name, func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
|
||||
return tmpPl, nil
|
||||
}); err != nil {
|
||||
t.Fatalf("Unable to register Permit plugin: %s", pl.name)
|
||||
}
|
||||
|
||||
configPlugins.Permit.Enabled = append(
|
||||
configPlugins.Permit.Enabled,
|
||||
config.Plugin{Name: pl.name},
|
||||
)
|
||||
}
|
||||
|
||||
configPlugins.Permit.Enabled = append(
|
||||
configPlugins.Permit.Enabled,
|
||||
config.Plugin{Name: pl.name},
|
||||
)
|
||||
}
|
||||
f, err := newFrameworkWithQueueSortAndBind(registry, configPlugins, emptyArgs)
|
||||
if err != nil {
|
||||
t.Fatalf("fail to create framework: %s", err)
|
||||
}
|
||||
|
||||
f, err := newFrameworkWithQueueSortAndBind(registry, configPlugins, emptyArgs)
|
||||
if err != nil {
|
||||
t.Fatalf("fail to create framework: %s", err)
|
||||
}
|
||||
status := f.RunPermitPlugins(context.TODO(), nil, pod, "")
|
||||
|
||||
status := f.RunPermitPlugins(context.TODO(), nil, pod, "")
|
||||
|
||||
if !reflect.DeepEqual(status, tt.want) {
|
||||
t.Errorf("wrong status code. got %v, want %v", status, tt.want)
|
||||
}
|
||||
if !reflect.DeepEqual(status, tt.want) {
|
||||
t.Errorf("wrong status code. got %v, want %v", status, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -2075,26 +2091,23 @@ func TestWaitOnPermit(t *testing.T) {
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
action func(f framework.Framework)
|
||||
wantStatus framework.Code
|
||||
wantMessage string
|
||||
name string
|
||||
action func(f framework.Framework)
|
||||
want *framework.Status
|
||||
}{
|
||||
{
|
||||
name: "Reject Waiting Pod",
|
||||
action: func(f framework.Framework) {
|
||||
f.GetWaitingPod(pod.UID).Reject("reject message")
|
||||
f.GetWaitingPod(pod.UID).Reject(permitPlugin, "reject message")
|
||||
},
|
||||
wantStatus: framework.Unschedulable,
|
||||
wantMessage: "pod \"pod\" rejected while waiting on permit: reject message",
|
||||
want: framework.NewStatus(framework.Unschedulable, "reject message").WithFailedPlugin(permitPlugin),
|
||||
},
|
||||
{
|
||||
name: "Allow Waiting Pod",
|
||||
action: func(f framework.Framework) {
|
||||
f.GetWaitingPod(pod.UID).Allow(permitPlugin)
|
||||
},
|
||||
wantStatus: framework.Success,
|
||||
wantMessage: "",
|
||||
want: nil,
|
||||
},
|
||||
}
|
||||
|
||||
@ -2123,14 +2136,9 @@ func TestWaitOnPermit(t *testing.T) {
|
||||
|
||||
go tt.action(f)
|
||||
|
||||
waitOnPermitStatus := f.WaitOnPermit(context.Background(), pod)
|
||||
if waitOnPermitStatus.Code() != tt.wantStatus {
|
||||
t.Fatalf("Expected WaitOnPermit to return status %v, but got %v",
|
||||
tt.wantStatus, waitOnPermitStatus.Code())
|
||||
}
|
||||
if waitOnPermitStatus.Message() != tt.wantMessage {
|
||||
t.Fatalf("Expected WaitOnPermit to return status with message %q, but got %q",
|
||||
tt.wantMessage, waitOnPermitStatus.Message())
|
||||
got := f.WaitOnPermit(context.Background(), pod)
|
||||
if !reflect.DeepEqual(tt.want, got) {
|
||||
t.Errorf("Unexpected status: want %v, but got %v", tt.want, got)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -100,7 +100,7 @@ func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *wa
|
||||
wp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() {
|
||||
msg := fmt.Sprintf("rejected due to timeout after waiting %v at plugin %v",
|
||||
waitTime, plugin)
|
||||
wp.Reject(msg)
|
||||
wp.Reject(plugin, msg)
|
||||
})
|
||||
}
|
||||
|
||||
@ -149,7 +149,7 @@ func (w *waitingPod) Allow(pluginName string) {
|
||||
}
|
||||
|
||||
// Reject declares the waiting pod unschedulable.
|
||||
func (w *waitingPod) Reject(msg string) {
|
||||
func (w *waitingPod) Reject(pluginName, msg string) {
|
||||
w.mu.RLock()
|
||||
defer w.mu.RUnlock()
|
||||
for _, timer := range w.pendingPlugins {
|
||||
@ -159,7 +159,7 @@ func (w *waitingPod) Reject(msg string) {
|
||||
// The select clause works as a non-blocking send.
|
||||
// If there is no receiver, it's a no-op (default case).
|
||||
select {
|
||||
case w.s <- framework.NewStatus(framework.Unschedulable, msg):
|
||||
case w.s <- framework.NewStatus(framework.Unschedulable, msg).WithFailedPlugin(pluginName):
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
@ -91,11 +91,17 @@ type WeightedAffinityTerm struct {
|
||||
Weight int32
|
||||
}
|
||||
|
||||
// Diagnosis records the details to diagnose a scheduling failure.
|
||||
type Diagnosis struct {
|
||||
NodeToStatusMap NodeToStatusMap
|
||||
UnschedulablePlugins sets.String
|
||||
}
|
||||
|
||||
// FitError describes a fit error of a pod.
|
||||
type FitError struct {
|
||||
Pod *v1.Pod
|
||||
NumAllNodes int
|
||||
FilteredNodesStatuses NodeToStatusMap
|
||||
Pod *v1.Pod
|
||||
NumAllNodes int
|
||||
Diagnosis Diagnosis
|
||||
}
|
||||
|
||||
const (
|
||||
@ -106,7 +112,7 @@ const (
|
||||
// Error returns detailed information of why the pod failed to fit on each node
|
||||
func (f *FitError) Error() string {
|
||||
reasons := make(map[string]int)
|
||||
for _, status := range f.FilteredNodesStatuses {
|
||||
for _, status := range f.Diagnosis.NodeToStatusMap {
|
||||
for _, reason := range status.Reasons() {
|
||||
reasons[reason]++
|
||||
}
|
||||
|
@ -462,7 +462,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
||||
klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
|
||||
} else {
|
||||
// Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.
|
||||
result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses)
|
||||
result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
|
||||
if status.Code() == framework.Error {
|
||||
klog.ErrorS(nil, "Status after running PostFilter plugins for pod", klog.KObj(pod), "status", status)
|
||||
} else {
|
||||
|
@ -636,11 +636,11 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
|
||||
expectErr := &framework.FitError{
|
||||
Pod: secondPod,
|
||||
NumAllNodes: 1,
|
||||
FilteredNodesStatuses: framework.NodeToStatusMap{
|
||||
node.Name: framework.NewStatus(
|
||||
framework.Unschedulable,
|
||||
nodeports.ErrReason,
|
||||
),
|
||||
Diagnosis: framework.Diagnosis{
|
||||
NodeToStatusMap: framework.NodeToStatusMap{
|
||||
node.Name: framework.NewStatus(framework.Unschedulable, nodeports.ErrReason).WithFailedPlugin(nodeports.Name),
|
||||
},
|
||||
UnschedulablePlugins: sets.NewString(nodeports.Name),
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(expectErr, err) {
|
||||
@ -761,7 +761,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
|
||||
framework.Unschedulable,
|
||||
fmt.Sprintf("Insufficient %v", v1.ResourceCPU),
|
||||
fmt.Sprintf("Insufficient %v", v1.ResourceMemory),
|
||||
)
|
||||
).WithFailedPlugin(noderesources.FitName)
|
||||
}
|
||||
fns := []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
@ -778,9 +778,12 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
|
||||
select {
|
||||
case err := <-errChan:
|
||||
expectErr := &framework.FitError{
|
||||
Pod: podWithTooBigResourceRequests,
|
||||
NumAllNodes: len(nodes),
|
||||
FilteredNodesStatuses: failedNodeStatues,
|
||||
Pod: podWithTooBigResourceRequests,
|
||||
NumAllNodes: len(nodes),
|
||||
Diagnosis: framework.Diagnosis{
|
||||
NodeToStatusMap: failedNodeStatues,
|
||||
UnschedulablePlugins: sets.NewString(noderesources.FitName),
|
||||
},
|
||||
}
|
||||
if len(fmt.Sprint(expectErr)) > 150 {
|
||||
t.Errorf("message is too spammy ! %v ", len(fmt.Sprint(expectErr)))
|
||||
|
@ -459,7 +459,7 @@ func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState,
|
||||
if pp.waitAndRejectPermit {
|
||||
pp.rejectingPod = pod.Name
|
||||
pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) {
|
||||
wp.Reject(fmt.Sprintf("reject pod %v", wp.GetPod().Name))
|
||||
wp.Reject(pp.name, fmt.Sprintf("reject pod %v", wp.GetPod().Name))
|
||||
})
|
||||
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)), 0
|
||||
}
|
||||
@ -479,7 +479,7 @@ func (pp *PermitPlugin) allowAllPods() {
|
||||
|
||||
// rejectAllPods rejects all waiting pods.
|
||||
func (pp *PermitPlugin) rejectAllPods() {
|
||||
pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Reject("rejectAllPods") })
|
||||
pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Reject(pp.name, "rejectAllPods") })
|
||||
}
|
||||
|
||||
// reset used to reset permit plugin.
|
||||
|
Loading…
Reference in New Issue
Block a user