Change structure of NodeToStatus map in scheduler

This commit is contained in:
Maciej Skoczeń 2024-07-11 07:08:31 +00:00
parent 7ec344dcef
commit 98be7dfc5d
14 changed files with 428 additions and 350 deletions

View File

@ -50,9 +50,120 @@ type NodeScore struct {
Score int64
}
// NodeToStatusMap contains the statuses of the Nodes where the incoming Pod was not schedulable.
// A PostFilter plugin that uses this map should interpret absent Nodes as UnschedulableAndUnresolvable.
type NodeToStatusMap map[string]*Status
// NodeToStatusReader is a read-only interface of NodeToStatus passed to each PostFilter plugin.
type NodeToStatusReader interface {
// Get returns the status for given nodeName.
// If the node is not in the map, the AbsentNodesStatus is returned.
Get(nodeName string) *Status
// NodesForStatusCode returns a list of NodeInfos for the nodes that have a given status code.
// It returns the NodeInfos for all matching nodes denoted by AbsentNodesStatus as well.
NodesForStatusCode(nodeLister NodeInfoLister, code Code) ([]*NodeInfo, error)
}
// NodeToStatusMap is an alias for NodeToStatusReader to keep partial backwards compatibility.
// NodeToStatusReader should be used if possible.
type NodeToStatusMap = NodeToStatusReader
// NodeToStatus contains the statuses of the Nodes where the incoming Pod was not schedulable.
type NodeToStatus struct {
// nodeToStatus contains specific statuses of the nodes.
nodeToStatus map[string]*Status
// absentNodesStatus defines a status for all nodes that are absent in nodeToStatus map.
// By default, all absent nodes are UnschedulableAndUnresolvable.
absentNodesStatus *Status
}
// NewDefaultNodeToStatus creates NodeToStatus without any node in the map.
// The absentNodesStatus is set by default to UnschedulableAndUnresolvable.
func NewDefaultNodeToStatus() *NodeToStatus {
return NewNodeToStatus(make(map[string]*Status), NewStatus(UnschedulableAndUnresolvable))
}
// NewNodeToStatus creates NodeToStatus initialized with given nodeToStatus and absentNodesStatus.
func NewNodeToStatus(nodeToStatus map[string]*Status, absentNodesStatus *Status) *NodeToStatus {
return &NodeToStatus{
nodeToStatus: nodeToStatus,
absentNodesStatus: absentNodesStatus,
}
}
// Get returns the status for given nodeName. If the node is not in the map, the absentNodesStatus is returned.
func (m *NodeToStatus) Get(nodeName string) *Status {
if status, ok := m.nodeToStatus[nodeName]; ok {
return status
}
return m.absentNodesStatus
}
// Set sets status for given nodeName.
func (m *NodeToStatus) Set(nodeName string, status *Status) {
m.nodeToStatus[nodeName] = status
}
// Len returns length of nodeToStatus map. It is not aware of number of absent nodes.
func (m *NodeToStatus) Len() int {
return len(m.nodeToStatus)
}
// AbsentNodesStatus returns absentNodesStatus value.
func (m *NodeToStatus) AbsentNodesStatus() *Status {
return m.absentNodesStatus
}
// SetAbsentNodesStatus sets absentNodesStatus value.
func (m *NodeToStatus) SetAbsentNodesStatus(status *Status) {
m.absentNodesStatus = status
}
// ForEachExplicitNode runs fn for each node which status is explicitly set.
// Imporatant note, it runs the fn only for nodes with a status explicitly registered,
// and hence may not run the fn for all existing nodes.
// For example, if PreFilter rejects all Nodes, the scheduler would NOT set a failure status to every Node,
// but set a failure status as AbsentNodesStatus.
// You're supposed to get a status from AbsentNodesStatus(), and consider all other nodes that are rejected by them.
func (m *NodeToStatus) ForEachExplicitNode(fn func(nodeName string, status *Status)) {
for nodeName, status := range m.nodeToStatus {
fn(nodeName, status)
}
}
// NodesForStatusCode returns a list of NodeInfos for the nodes that matches a given status code.
// If the absentNodesStatus matches the code, all existing nodes are fetched using nodeLister
// and filtered using NodeToStatus.Get.
// If the absentNodesStatus doesn't match the code, nodeToStatus map is used to create a list of nodes
// and nodeLister.Get is used to obtain NodeInfo for each.
func (m *NodeToStatus) NodesForStatusCode(nodeLister NodeInfoLister, code Code) ([]*NodeInfo, error) {
var resultNodes []*NodeInfo
if m.AbsentNodesStatus().Code() == code {
allNodes, err := nodeLister.List()
if err != nil {
return nil, err
}
if m.Len() == 0 {
// All nodes are absent and status code is matching, so can return all nodes.
return allNodes, nil
}
// Need to find all the nodes that are absent or have a matching code using the allNodes.
for _, node := range allNodes {
nodeName := node.Node().Name
if status := m.Get(nodeName); status.Code() == code {
resultNodes = append(resultNodes, node)
}
}
return resultNodes, nil
}
m.ForEachExplicitNode(func(nodeName string, status *Status) {
if status.Code() == code {
if nodeInfo, err := nodeLister.Get(nodeName); err == nil {
resultNodes = append(resultNodes, nodeInfo)
}
}
})
return resultNodes, nil
}
// NodePluginScores is a struct with node name and scores for that node.
type NodePluginScores struct {
@ -446,15 +557,11 @@ type PostFilterPlugin interface {
Plugin
// PostFilter is called by the scheduling framework
// when the scheduling cycle failed at PreFilter or Filter by Unschedulable or UnschedulableAndUnresolvable.
// NodeToStatusMap has statuses that each Node got in the Filter phase.
// If this scheduling cycle failed at PreFilter, all Nodes have the status from the rejector PreFilter plugin in NodeToStatusMap.
// Note that the scheduling framework runs PostFilter plugins even when PreFilter returned UnschedulableAndUnresolvable.
// In that case, NodeToStatusMap contains all Nodes with UnschedulableAndUnresolvable.
// If there is no entry in the NodeToStatus map, its implicit status is UnschedulableAndUnresolvable.
// NodeToStatusReader has statuses that each Node got in PreFilter or Filter phase.
//
// Also, ignoring Nodes with UnschedulableAndUnresolvable is the responsibility of each PostFilter plugin,
// meaning NodeToStatusMap obviously could have Nodes with UnschedulableAndUnresolvable
// and the scheduling framework does call PostFilter even when all Nodes in NodeToStatusMap are UnschedulableAndUnresolvable.
// If you're implementing a custom preemption with PostFilter, ignoring Nodes with UnschedulableAndUnresolvable is the responsibility of your plugin,
// meaning NodeToStatusReader could have Nodes with UnschedulableAndUnresolvable
// and the scheduling framework does call PostFilter plugins even when all Nodes in NodeToStatusReader are UnschedulableAndUnresolvable.
//
// A PostFilter plugin should return one of the following statuses:
// - Unschedulable: the plugin gets executed successfully but the pod cannot be made schedulable.
@ -465,7 +572,7 @@ type PostFilterPlugin interface {
// Optionally, a non-nil PostFilterResult may be returned along with a Success status. For example,
// a preemption plugin may choose to return nominatedNodeName, so that framework can reuse that to update the
// preemptor pod's .spec.status.nominatedNodeName field.
PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusReader) (*PostFilterResult, *Status)
}
// PreScorePlugin is an interface for "PreScore" plugin. PreScore is an
@ -598,7 +705,7 @@ type Framework interface {
// PostFilter plugins can either be informational, in which case should be configured
// to execute first and return Unschedulable status, or ones that try to change the
// cluster state to make the pod potentially schedulable in a future scheduling cycle.
RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusReader) (*PostFilterResult, *Status)
// RunPreBindPlugins runs the set of configured PreBind plugins. It returns
// *Status and its code is set to non-success if any of the plugins returns

View File

@ -23,6 +23,7 @@ import (
"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/util/sets"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
var errorStatus = NewStatus(Error, "internal error")
@ -259,3 +260,118 @@ func TestIsStatusEqual(t *testing.T) {
})
}
}
type nodeInfoLister []*NodeInfo
func (nodes nodeInfoLister) Get(nodeName string) (*NodeInfo, error) {
for _, node := range nodes {
if node != nil && node.Node().Name == nodeName {
return node, nil
}
}
return nil, fmt.Errorf("unable to find node: %s", nodeName)
}
func (nodes nodeInfoLister) List() ([]*NodeInfo, error) {
return nodes, nil
}
func (nodes nodeInfoLister) HavePodsWithAffinityList() ([]*NodeInfo, error) {
return nodes, nil
}
func (nodes nodeInfoLister) HavePodsWithRequiredAntiAffinityList() ([]*NodeInfo, error) {
return nodes, nil
}
func TestNodesForStatusCode(t *testing.T) {
// Prepare 4 nodes names.
nodeNames := []string{"node1", "node2", "node3", "node4"}
tests := []struct {
name string
nodesStatuses *NodeToStatus
code Code
expected sets.Set[string] // set of expected node names.
}{
{
name: "No node should be attempted",
nodesStatuses: NewNodeToStatus(map[string]*Status{
"node1": NewStatus(UnschedulableAndUnresolvable),
"node2": NewStatus(UnschedulableAndUnresolvable),
"node3": NewStatus(UnschedulableAndUnresolvable),
"node4": NewStatus(UnschedulableAndUnresolvable),
}, NewStatus(UnschedulableAndUnresolvable)),
code: Unschedulable,
expected: sets.New[string](),
},
{
name: "All nodes should be attempted",
nodesStatuses: NewNodeToStatus(map[string]*Status{
"node1": NewStatus(UnschedulableAndUnresolvable),
"node2": NewStatus(UnschedulableAndUnresolvable),
"node3": NewStatus(UnschedulableAndUnresolvable),
"node4": NewStatus(UnschedulableAndUnresolvable),
}, NewStatus(UnschedulableAndUnresolvable)),
code: UnschedulableAndUnresolvable,
expected: sets.New[string]("node1", "node2", "node3", "node4"),
},
{
name: "No node should be attempted, as all are implicitly not matching the code",
nodesStatuses: NewDefaultNodeToStatus(),
code: Unschedulable,
expected: sets.New[string](),
},
{
name: "All nodes should be attempted, as all are implicitly matching the code",
nodesStatuses: NewDefaultNodeToStatus(),
code: UnschedulableAndUnresolvable,
expected: sets.New[string]("node1", "node2", "node3", "node4"),
},
{
name: "UnschedulableAndUnresolvable status should be skipped but Unschedulable should be tried",
nodesStatuses: NewNodeToStatus(map[string]*Status{
"node1": NewStatus(Unschedulable),
"node2": NewStatus(UnschedulableAndUnresolvable),
"node3": NewStatus(Unschedulable),
// node4 is UnschedulableAndUnresolvable by absence
}, NewStatus(UnschedulableAndUnresolvable)),
code: Unschedulable,
expected: sets.New("node1", "node3"),
},
{
name: "Unschedulable status should be skipped but UnschedulableAndUnresolvable should be tried",
nodesStatuses: NewNodeToStatus(map[string]*Status{
"node1": NewStatus(Unschedulable),
"node2": NewStatus(UnschedulableAndUnresolvable),
"node3": NewStatus(Unschedulable),
// node4 is UnschedulableAndUnresolvable by absence
}, NewStatus(UnschedulableAndUnresolvable)),
code: UnschedulableAndUnresolvable,
expected: sets.New("node2", "node4"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var nodeInfos nodeInfoLister
for _, name := range nodeNames {
ni := NewNodeInfo()
ni.SetNode(st.MakeNode().Name(name).Obj())
nodeInfos = append(nodeInfos, ni)
}
nodes, err := tt.nodesStatuses.NodesForStatusCode(nodeInfos, tt.code)
if err != nil {
t.Fatalf("Failed to get nodes for status code: %s", err)
}
if len(tt.expected) != len(nodes) {
t.Errorf("Number of nodes is not the same as expected. expected: %d, got: %d. Nodes: %v", len(tt.expected), len(nodes), nodes)
}
for _, node := range nodes {
name := node.Node().Name
if _, found := tt.expected[name]; !found {
t.Errorf("Node %v is not expected", name)
}
}
})
}
}

View File

@ -82,7 +82,7 @@ func New(_ context.Context, dpArgs runtime.Object, fh framework.Handle, fts feat
}
// PostFilter invoked at the postFilter extension point.
func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusReader) (*framework.PostFilterResult, *framework.Status) {
defer func() {
metrics.PreemptionAttempts.Inc()
}()

View File

@ -150,7 +150,7 @@ func TestPostFilter(t *testing.T) {
pod *v1.Pod
pods []*v1.Pod
nodes []*v1.Node
filteredNodesStatuses framework.NodeToStatusMap
filteredNodesStatuses *framework.NodeToStatus
extender framework.Extender
wantResult *framework.PostFilterResult
wantStatus *framework.Status
@ -164,9 +164,9 @@ func TestPostFilter(t *testing.T) {
nodes: []*v1.Node{
st.MakeNode().Name("node1").Capacity(onePodRes).Obj(),
},
filteredNodesStatuses: framework.NodeToStatusMap{
filteredNodesStatuses: framework.NewNodeToStatus(map[string]*framework.Status{
"node1": framework.NewStatus(framework.Unschedulable),
},
}, framework.NewStatus(framework.UnschedulableAndUnresolvable)),
wantResult: framework.NewPostFilterResultWithNominatedNode("node1"),
wantStatus: framework.NewStatus(framework.Success),
},
@ -179,9 +179,9 @@ func TestPostFilter(t *testing.T) {
nodes: []*v1.Node{
st.MakeNode().Name("node1").Capacity(onePodRes).Obj(),
},
filteredNodesStatuses: framework.NodeToStatusMap{
filteredNodesStatuses: framework.NewNodeToStatus(map[string]*framework.Status{
"node1": framework.NewStatus(framework.Unschedulable),
},
}, framework.NewStatus(framework.UnschedulableAndUnresolvable)),
wantResult: framework.NewPostFilterResultWithNominatedNode(""),
wantStatus: framework.NewStatus(framework.Unschedulable, "preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod."),
},
@ -194,12 +194,25 @@ func TestPostFilter(t *testing.T) {
nodes: []*v1.Node{
st.MakeNode().Name("node1").Capacity(onePodRes).Obj(),
},
filteredNodesStatuses: framework.NodeToStatusMap{
filteredNodesStatuses: framework.NewNodeToStatus(map[string]*framework.Status{
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable),
},
}, framework.NewStatus(framework.UnschedulableAndUnresolvable)),
wantResult: framework.NewPostFilterResultWithNominatedNode(""),
wantStatus: framework.NewStatus(framework.Unschedulable, "preemption: 0/1 nodes are available: 1 Preemption is not helpful for scheduling."),
},
{
name: "preemption should respect absent NodeToStatusMap entry meaning UnschedulableAndUnresolvable",
pod: st.MakePod().Name("p").UID("p").Namespace(v1.NamespaceDefault).Priority(highPriority).Obj(),
pods: []*v1.Pod{
st.MakePod().Name("p1").UID("p1").Namespace(v1.NamespaceDefault).Node("node1").Obj(),
},
nodes: []*v1.Node{
st.MakeNode().Name("node1").Capacity(onePodRes).Obj(),
},
filteredNodesStatuses: framework.NewDefaultNodeToStatus(),
wantResult: framework.NewPostFilterResultWithNominatedNode(""),
wantStatus: framework.NewStatus(framework.Unschedulable, "preemption: 0/1 nodes are available: 1 Preemption is not helpful for scheduling."),
},
{
name: "pod can be made schedulable on one node",
pod: st.MakePod().Name("p").UID("p").Namespace(v1.NamespaceDefault).Priority(midPriority).Obj(),
@ -211,10 +224,10 @@ func TestPostFilter(t *testing.T) {
st.MakeNode().Name("node1").Capacity(onePodRes).Obj(),
st.MakeNode().Name("node2").Capacity(onePodRes).Obj(),
},
filteredNodesStatuses: framework.NodeToStatusMap{
filteredNodesStatuses: framework.NewNodeToStatus(map[string]*framework.Status{
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
}, framework.NewStatus(framework.UnschedulableAndUnresolvable)),
wantResult: framework.NewPostFilterResultWithNominatedNode("node2"),
wantStatus: framework.NewStatus(framework.Success),
},
@ -229,10 +242,10 @@ func TestPostFilter(t *testing.T) {
st.MakeNode().Name("node1").Capacity(onePodRes).Obj(),
st.MakeNode().Name("node2").Capacity(onePodRes).Obj(),
},
filteredNodesStatuses: framework.NodeToStatusMap{
filteredNodesStatuses: framework.NewNodeToStatus(map[string]*framework.Status{
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
}, framework.NewStatus(framework.UnschedulableAndUnresolvable)),
extender: &tf.FakeExtender{
ExtenderName: "FakeExtender1",
Predicates: []tf.FitPredicate{tf.Node1PredicateExtender},
@ -251,10 +264,10 @@ func TestPostFilter(t *testing.T) {
st.MakeNode().Name("node1").Capacity(nodeRes).Obj(), // no enough CPU resource
st.MakeNode().Name("node2").Capacity(nodeRes).Obj(), // no enough CPU resource
},
filteredNodesStatuses: framework.NodeToStatusMap{
filteredNodesStatuses: framework.NewNodeToStatus(map[string]*framework.Status{
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
}, framework.NewStatus(framework.UnschedulableAndUnresolvable)),
wantResult: framework.NewPostFilterResultWithNominatedNode(""),
wantStatus: framework.NewStatus(framework.Unschedulable, "preemption: 0/2 nodes are available: 2 Insufficient cpu."),
},
@ -271,11 +284,11 @@ func TestPostFilter(t *testing.T) {
st.MakeNode().Name("node2").Capacity(nodeRes).Obj(), // no enough CPU resource
st.MakeNode().Name("node3").Capacity(onePodRes).Obj(), // no pod will be preempted
},
filteredNodesStatuses: framework.NodeToStatusMap{
filteredNodesStatuses: framework.NewNodeToStatus(map[string]*framework.Status{
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
"node3": framework.NewStatus(framework.Unschedulable),
},
}, framework.NewStatus(framework.UnschedulableAndUnresolvable)),
wantResult: framework.NewPostFilterResultWithNominatedNode(""),
wantStatus: framework.NewStatus(framework.Unschedulable, "preemption: 0/3 nodes are available: 1 Insufficient cpu, 2 No preemption victims found for incoming pod."),
},
@ -292,11 +305,11 @@ func TestPostFilter(t *testing.T) {
st.MakeNode().Name("node3").Capacity(nodeRes).Obj(),
st.MakeNode().Name("node4").Capacity(nodeRes).Obj(),
},
filteredNodesStatuses: framework.NodeToStatusMap{
filteredNodesStatuses: framework.NewNodeToStatus(map[string]*framework.Status{
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
"node4": framework.NewStatus(framework.UnschedulableAndUnresolvable),
},
}, framework.NewStatus(framework.UnschedulableAndUnresolvable)),
wantResult: framework.NewPostFilterResultWithNominatedNode(""),
wantStatus: framework.NewStatus(framework.Unschedulable, "preemption: 0/4 nodes are available: 2 Insufficient cpu, 2 Preemption is not helpful for scheduling."),
},
@ -307,10 +320,12 @@ func TestPostFilter(t *testing.T) {
st.MakePod().Name("p1").UID("p1").Namespace(v1.NamespaceDefault).Node("node1").Obj(),
},
// label the node with key as "error" so that the TestPlugin will fail with error.
nodes: []*v1.Node{st.MakeNode().Name("node1").Capacity(largeRes).Label("error", "true").Obj()},
filteredNodesStatuses: framework.NodeToStatusMap{"node1": framework.NewStatus(framework.Unschedulable)},
wantResult: nil,
wantStatus: framework.AsStatus(errors.New("preemption: running RemovePod on PreFilter plugin \"test-plugin\": failed to remove pod: p")),
nodes: []*v1.Node{st.MakeNode().Name("node1").Capacity(largeRes).Label("error", "true").Obj()},
filteredNodesStatuses: framework.NewNodeToStatus(map[string]*framework.Status{
"node1": framework.NewStatus(framework.Unschedulable),
}, framework.NewStatus(framework.UnschedulableAndUnresolvable)),
wantResult: nil,
wantStatus: framework.AsStatus(errors.New("preemption: running RemovePod on PreFilter plugin \"test-plugin\": failed to remove pod: p")),
},
{
name: "one failed with TestPlugin and the other pass",
@ -324,10 +339,10 @@ func TestPostFilter(t *testing.T) {
st.MakeNode().Name("node1").Capacity(largeRes).Label("error", "true").Obj(),
st.MakeNode().Name("node2").Capacity(largeRes).Obj(),
},
filteredNodesStatuses: framework.NodeToStatusMap{
filteredNodesStatuses: framework.NewNodeToStatus(map[string]*framework.Status{
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
}, framework.NewStatus(framework.UnschedulableAndUnresolvable)),
wantResult: framework.NewPostFilterResultWithNominatedNode("node2"),
wantStatus: framework.NewStatus(framework.Success),
},
@ -1806,9 +1821,10 @@ func TestPreempt(t *testing.T) {
// so that these nodes are eligible for preemption, we set their status
// to Unschedulable.
nodeToStatusMap := make(framework.NodeToStatusMap, len(nodes))
nodeToStatusMap := framework.NewDefaultNodeToStatus()
for _, n := range nodes {
nodeToStatusMap[n.Name] = framework.NewStatus(framework.Unschedulable)
nodeToStatusMap.Set(n.Name, framework.NewStatus(framework.Unschedulable))
}
res, status := pe.Preempt(ctx, test.pod, nodeToStatusMap)
@ -1866,7 +1882,7 @@ func TestPreempt(t *testing.T) {
}
// Call preempt again and make sure it doesn't preempt any more pods.
res, status = pe.Preempt(ctx, test.pod, make(framework.NodeToStatusMap))
res, status = pe.Preempt(ctx, test.pod, framework.NewDefaultNodeToStatus())
if !status.IsSuccess() && !status.IsRejected() {
t.Errorf("unexpected error in preemption: %v", status.AsError())
}

View File

@ -1240,7 +1240,7 @@ func (pl *dynamicResources) Filter(ctx context.Context, cs *framework.CycleState
// deallocated to help get the Pod schedulable. If yes, it picks one and
// requests its deallocation. This only gets called when filtering found no
// suitable node.
func (pl *dynamicResources) PostFilter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
func (pl *dynamicResources) PostFilter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusReader) (*framework.PostFilterResult, *framework.Status) {
if !pl.enabled {
return nil, framework.NewStatus(framework.Unschedulable, "plugin disabled")
}

View File

@ -147,7 +147,7 @@ type Evaluator struct {
//
// - <non-nil PostFilterResult, Success>. It's the regular happy path
// and the non-empty nominatedNodeName will be applied to the preemptor pod.
func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusReader) (*framework.PostFilterResult, *framework.Status) {
logger := klog.FromContext(ctx)
// 0) Fetch the latest version of <pod>.
@ -162,13 +162,18 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
}
// 1) Ensure the preemptor is eligible to preempt other pods.
if ok, msg := ev.PodEligibleToPreemptOthers(pod, m[pod.Status.NominatedNodeName]); !ok {
nominatedNodeStatus := m.Get(pod.Status.NominatedNodeName)
if ok, msg := ev.PodEligibleToPreemptOthers(pod, nominatedNodeStatus); !ok {
logger.V(5).Info("Pod is not eligible for preemption", "pod", klog.KObj(pod), "reason", msg)
return nil, framework.NewStatus(framework.Unschedulable, msg)
}
// 2) Find all preemption candidates.
candidates, nodeToStatusMap, err := ev.findCandidates(ctx, pod, m)
allNodes, err := ev.Handler.SnapshotSharedLister().NodeInfos().List()
if err != nil {
return nil, framework.AsStatus(err)
}
candidates, nodeToStatusMap, err := ev.findCandidates(ctx, allNodes, pod, m)
if err != nil && len(candidates) == 0 {
return nil, framework.AsStatus(err)
}
@ -177,12 +182,13 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
if len(candidates) == 0 {
fitError := &framework.FitError{
Pod: pod,
NumAllNodes: len(nodeToStatusMap),
NumAllNodes: len(allNodes),
Diagnosis: framework.Diagnosis{
NodeToStatusMap: nodeToStatusMap,
NodeToStatus: nodeToStatusMap,
// Leave UnschedulablePlugins or PendingPlugins as nil as it won't be used on moving Pods.
},
}
fitError.Diagnosis.NodeToStatus.SetAbsentNodesStatus(framework.NewStatus(framework.UnschedulableAndUnresolvable, "Preemption is not helpful for scheduling"))
// Specify nominatedNodeName to clear the pod's nominatedNodeName status, if applicable.
return framework.NewPostFilterResultWithNominatedNode(""), framework.NewStatus(framework.Unschedulable, fitError.Error())
}
@ -209,16 +215,16 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
// FindCandidates calculates a slice of preemption candidates.
// Each candidate is executable to make the given <pod> schedulable.
func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) ([]Candidate, framework.NodeToStatusMap, error) {
allNodes, err := ev.Handler.SnapshotSharedLister().NodeInfos().List()
if err != nil {
return nil, nil, err
}
func (ev *Evaluator) findCandidates(ctx context.Context, allNodes []*framework.NodeInfo, pod *v1.Pod, m framework.NodeToStatusReader) ([]Candidate, *framework.NodeToStatus, error) {
if len(allNodes) == 0 {
return nil, nil, errors.New("no nodes available")
}
logger := klog.FromContext(ctx)
potentialNodes, unschedulableNodeStatus := nodesWherePreemptionMightHelp(allNodes, m)
// Get a list of nodes with failed predicates (Unschedulable) that may be satisfied by removing pods from the node.
potentialNodes, err := m.NodesForStatusCode(ev.Handler.SnapshotSharedLister().NodeInfos(), framework.Unschedulable)
if err != nil {
return nil, nil, err
}
if len(potentialNodes) == 0 {
logger.V(3).Info("Preemption will not help schedule pod on any node", "pod", klog.KObj(pod))
// In this case, we should clean-up any existing nominated node name of the pod.
@ -226,7 +232,7 @@ func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framewor
logger.Error(err, "Could not clear the nominatedNodeName field of pod", "pod", klog.KObj(pod))
// We do not return as this error is not critical.
}
return nil, unschedulableNodeStatus, nil
return nil, framework.NewDefaultNodeToStatus(), nil
}
pdbs, err := getPodDisruptionBudgets(ev.PdbLister)
@ -242,11 +248,7 @@ func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framewor
}
loggerV.Info("Selected candidates from a pool of nodes", "potentialNodesCount", len(potentialNodes), "offset", offset, "sampleLength", len(sample), "sample", sample, "candidates", numCandidates)
}
candidates, nodeStatuses, err := ev.DryRunPreemption(ctx, pod, potentialNodes, pdbs, offset, numCandidates)
for node, nodeStatus := range unschedulableNodeStatus {
nodeStatuses[node] = nodeStatus
}
return candidates, nodeStatuses, err
return ev.DryRunPreemption(ctx, pod, potentialNodes, pdbs, offset, numCandidates)
}
// callExtenders calls given <extenders> to select the list of feasible candidates.
@ -410,27 +412,6 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
return nil
}
// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates
// that may be satisfied by removing pods from the node.
func nodesWherePreemptionMightHelp(nodes []*framework.NodeInfo, m framework.NodeToStatusMap) ([]*framework.NodeInfo, framework.NodeToStatusMap) {
var potentialNodes []*framework.NodeInfo
nodeStatuses := make(framework.NodeToStatusMap)
unresolvableStatus := framework.NewStatus(framework.UnschedulableAndUnresolvable, "Preemption is not helpful for scheduling")
for _, node := range nodes {
nodeName := node.Node().Name
// We only attempt preemption on nodes with status 'Unschedulable'. For
// diagnostic purposes, we propagate UnschedulableAndUnresolvable if either
// implied by absence in map or explicitly set.
status, ok := m[nodeName]
if status.Code() == framework.Unschedulable {
potentialNodes = append(potentialNodes, node)
} else if !ok || status.Code() == framework.UnschedulableAndUnresolvable {
nodeStatuses[nodeName] = unresolvableStatus
}
}
return potentialNodes, nodeStatuses
}
func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister) ([]*policy.PodDisruptionBudget, error) {
if pdbLister != nil {
return pdbLister.List(labels.Everything())
@ -569,13 +550,14 @@ func getLowerPriorityNominatedPods(logger klog.Logger, pn framework.PodNominator
// candidates, ones that do not violate PDB are preferred over ones that do.
// NOTE: This method is exported for easier testing in default preemption.
func (ev *Evaluator) DryRunPreemption(ctx context.Context, pod *v1.Pod, potentialNodes []*framework.NodeInfo,
pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) ([]Candidate, framework.NodeToStatusMap, error) {
pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) ([]Candidate, *framework.NodeToStatus, error) {
fh := ev.Handler
nonViolatingCandidates := newCandidateList(numCandidates)
violatingCandidates := newCandidateList(numCandidates)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
nodeStatuses := make(framework.NodeToStatusMap)
nodeStatuses := framework.NewDefaultNodeToStatus()
var statusesLock sync.Mutex
var errs []error
checkNode := func(i int) {
@ -609,7 +591,7 @@ func (ev *Evaluator) DryRunPreemption(ctx context.Context, pod *v1.Pod, potentia
if status.Code() == framework.Error {
errs = append(errs, status.AsError())
}
nodeStatuses[nodeInfoCopy.Node().Name] = status
nodeStatuses.Set(nodeInfoCopy.Node().Name, status)
statusesLock.Unlock()
}
fh.Parallelizer().Until(ctx, len(potentialNodes), checkNode, ev.PluginName)

View File

@ -18,7 +18,6 @@ package preemption
import (
"context"
"fmt"
"sort"
"testing"
@ -26,7 +25,6 @@ import (
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2/ktesting"
@ -34,16 +32,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
@ -123,128 +112,6 @@ func (pl *FakePreemptionScorePostFilterPlugin) OrderedScoreFuncs(ctx context.Con
}
}
func TestNodesWherePreemptionMightHelp(t *testing.T) {
// Prepare 4 nodes names.
nodeNames := []string{"node1", "node2", "node3", "node4"}
tests := []struct {
name string
nodesStatuses framework.NodeToStatusMap
expected sets.Set[string] // set of expected node names.
}{
{
name: "No node should be attempted",
nodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodeaffinity.ErrReasonPod),
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodename.ErrReason),
"node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, tainttoleration.ErrReasonNotMatch),
"node4": framework.NewStatus(framework.UnschedulableAndUnresolvable, interpodaffinity.ErrReasonAffinityRulesNotMatch),
},
expected: sets.New[string](),
},
{
name: "ErrReasonAntiAffinityRulesNotMatch should be tried as it indicates that the pod is unschedulable due to inter-pod anti-affinity",
nodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.Unschedulable, interpodaffinity.ErrReasonAntiAffinityRulesNotMatch),
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodename.ErrReason),
"node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodeunschedulable.ErrReasonUnschedulable),
"node4": framework.NewStatus(framework.Unschedulable, "Unschedulable"),
},
expected: sets.New("node1", "node4"),
},
{
name: "ErrReasonAffinityRulesNotMatch should not be tried as it indicates that the pod is unschedulable due to inter-pod affinity, but ErrReasonAntiAffinityRulesNotMatch should be tried as it indicates that the pod is unschedulable due to inter-pod anti-affinity",
nodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, interpodaffinity.ErrReasonAffinityRulesNotMatch),
"node2": framework.NewStatus(framework.Unschedulable, interpodaffinity.ErrReasonAntiAffinityRulesNotMatch),
"node3": framework.NewStatus(framework.Unschedulable, "Unschedulable"),
"node4": framework.NewStatus(framework.Unschedulable, "Unschedulable"),
},
expected: sets.New("node2", "node3", "node4"),
},
{
name: "Mix of failed predicates works fine",
nodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, volumerestrictions.ErrReasonDiskConflict),
"node2": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("Insufficient %v", v1.ResourceMemory)),
"node3": framework.NewStatus(framework.Unschedulable, "Unschedulable"),
"node4": framework.NewStatus(framework.Unschedulable, "Unschedulable"),
},
expected: sets.New("node2", "node3", "node4"),
},
{
name: "Node condition errors should be considered unresolvable",
nodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodeunschedulable.ErrReasonUnknownCondition),
"node2": framework.NewStatus(framework.Unschedulable, "Unschedulable"),
"node3": framework.NewStatus(framework.Unschedulable, "Unschedulable"),
"node4": framework.NewStatus(framework.Unschedulable, "Unschedulable"),
},
expected: sets.New("node2", "node3", "node4"),
},
{
name: "ErrVolume... errors should not be tried as it indicates that the pod is unschedulable due to no matching volumes for pod on node",
nodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, volumezone.ErrReasonConflict),
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, string(volumebinding.ErrReasonNodeConflict)),
"node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, string(volumebinding.ErrReasonBindConflict)),
"node4": framework.NewStatus(framework.Unschedulable, "Unschedulable"),
},
expected: sets.New("node4"),
},
{
name: "ErrReasonConstraintsNotMatch should be tried as it indicates that the pod is unschedulable due to topology spread constraints",
nodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.Unschedulable, podtopologyspread.ErrReasonConstraintsNotMatch),
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodename.ErrReason),
"node3": framework.NewStatus(framework.Unschedulable, podtopologyspread.ErrReasonConstraintsNotMatch),
"node4": framework.NewStatus(framework.Unschedulable, "Unschedulable"),
},
expected: sets.New("node1", "node3", "node4"),
},
{
name: "UnschedulableAndUnresolvable status should be skipped but Unschedulable should be tried",
nodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.Unschedulable, ""),
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""),
"node3": framework.NewStatus(framework.Unschedulable, ""),
"node4": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""),
},
expected: sets.New("node1", "node3"),
},
{
name: "ErrReasonNodeLabelNotMatch should not be tried as it indicates that the pod is unschedulable due to node doesn't have the required label",
nodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.Unschedulable, ""),
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, podtopologyspread.ErrReasonNodeLabelNotMatch),
"node3": framework.NewStatus(framework.Unschedulable, ""),
"node4": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""),
},
expected: sets.New("node1", "node3"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var nodeInfos []*framework.NodeInfo
for _, name := range nodeNames {
ni := framework.NewNodeInfo()
ni.SetNode(st.MakeNode().Name(name).Obj())
nodeInfos = append(nodeInfos, ni)
}
nodes, _ := nodesWherePreemptionMightHelp(nodeInfos, tt.nodesStatuses)
if len(tt.expected) != len(nodes) {
t.Errorf("number of nodes is not the same as expected. exptectd: %d, got: %d. Nodes: %v", len(tt.expected), len(nodes), nodes)
}
for _, node := range nodes {
name := node.Node().Name
if _, found := tt.expected[name]; !found {
t.Errorf("node %v is not expected.", name)
}
}
})
}
}
func TestDryRunPreemption(t *testing.T) {
tests := []struct {
name string

View File

@ -905,7 +905,7 @@ func (f *frameworkImpl) runFilterPlugin(ctx context.Context, pl framework.Filter
// RunPostFilterPlugins runs the set of configured PostFilter plugins until the first
// Success, Error or UnschedulableAndUnresolvable is met; otherwise continues to execute all plugins.
func (f *frameworkImpl) RunPostFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (_ *framework.PostFilterResult, status *framework.Status) {
func (f *frameworkImpl) RunPostFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusReader) (_ *framework.PostFilterResult, status *framework.Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PostFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
@ -950,7 +950,7 @@ func (f *frameworkImpl) RunPostFilterPlugins(ctx context.Context, state *framewo
return result, framework.NewStatus(framework.Unschedulable, reasons...).WithPlugin(rejectorPlugin)
}
func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.PostFilterPlugin, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.PostFilterPlugin, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusReader) (*framework.PostFilterResult, *framework.Status) {
if !state.ShouldRecordPluginMetrics() {
return pl.PostFilter(ctx, state, pod, filteredNodeStatusMap)
}

View File

@ -209,7 +209,7 @@ func (pl *TestPlugin) Filter(ctx context.Context, state *framework.CycleState, p
return framework.NewStatus(framework.Code(pl.inj.FilterStatus), injectFilterReason)
}
func (pl *TestPlugin) PostFilter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
func (pl *TestPlugin) PostFilter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ framework.NodeToStatusReader) (*framework.PostFilterResult, *framework.Status) {
return nil, framework.NewStatus(framework.Code(pl.inj.PostFilterStatus), injectReason)
}

View File

@ -336,12 +336,10 @@ const ExtenderName = "Extender"
// Diagnosis records the details to diagnose a scheduling failure.
type Diagnosis struct {
// NodeToStatusMap records the status of each retriable node (status Unschedulable)
// NodeToStatus records the status of nodes and generic status for absent ones.
// if they're rejected in PreFilter (via PreFilterResult) or Filter plugins.
// Nodes that pass PreFilter/Filter plugins are not included in this map.
// While this map may contain UnschedulableAndUnresolvable statuses, the absence of
// a node should be interpreted as UnschedulableAndUnresolvable.
NodeToStatusMap NodeToStatusMap
NodeToStatus *NodeToStatus
// UnschedulablePlugins are plugins that returns Unschedulable or UnschedulableAndUnresolvable.
UnschedulablePlugins sets.Set[string]
// UnschedulablePlugins are plugins that returns Pending.
@ -401,10 +399,16 @@ func (f *FitError) Error() string {
// So, we shouldn't add the message from NodeToStatusMap when the PreFilter failed.
// Otherwise, we will have duplicated reasons in the error message.
reasons := make(map[string]int)
for _, status := range f.Diagnosis.NodeToStatusMap {
f.Diagnosis.NodeToStatus.ForEachExplicitNode(func(_ string, status *Status) {
for _, reason := range status.Reasons() {
reasons[reason]++
}
})
if f.Diagnosis.NodeToStatus.Len() < f.NumAllNodes {
// Adding predefined reasons for nodes that are absent in NodeToStatusMap
for _, reason := range f.Diagnosis.NodeToStatus.AbsentNodesStatus().Reasons() {
reasons[reason] += f.NumAllNodes - f.Diagnosis.NodeToStatus.Len()
}
}
sortReasonsHistogram := func() []string {

View File

@ -1394,13 +1394,13 @@ func TestFitError_Error(t *testing.T) {
numAllNodes: 3,
diagnosis: Diagnosis{
PreFilterMsg: "Node(s) failed PreFilter plugin FalsePreFilter",
NodeToStatusMap: NodeToStatusMap{
NodeToStatus: NewNodeToStatus(map[string]*Status{
// They're inserted by the framework.
// We don't include them in the reason message because they'd be just duplicates.
"node1": NewStatus(Unschedulable, "Node(s) failed PreFilter plugin FalsePreFilter"),
"node2": NewStatus(Unschedulable, "Node(s) failed PreFilter plugin FalsePreFilter"),
"node3": NewStatus(Unschedulable, "Node(s) failed PreFilter plugin FalsePreFilter"),
},
}, NewStatus(UnschedulableAndUnresolvable)),
},
wantReasonMsg: "0/3 nodes are available: Node(s) failed PreFilter plugin FalsePreFilter.",
},
@ -1409,13 +1409,13 @@ func TestFitError_Error(t *testing.T) {
numAllNodes: 3,
diagnosis: Diagnosis{
PreFilterMsg: "Node(s) failed PreFilter plugin FalsePreFilter",
NodeToStatusMap: NodeToStatusMap{
NodeToStatus: NewNodeToStatus(map[string]*Status{
// They're inserted by the framework.
// We don't include them in the reason message because they'd be just duplicates.
"node1": NewStatus(Unschedulable, "Node(s) failed PreFilter plugin FalsePreFilter"),
"node2": NewStatus(Unschedulable, "Node(s) failed PreFilter plugin FalsePreFilter"),
"node3": NewStatus(Unschedulable, "Node(s) failed PreFilter plugin FalsePreFilter"),
},
}, NewStatus(UnschedulableAndUnresolvable)),
// PostFilterMsg will be included.
PostFilterMsg: "Error running PostFilter plugin FailedPostFilter",
},
@ -1426,11 +1426,11 @@ func TestFitError_Error(t *testing.T) {
numAllNodes: 3,
diagnosis: Diagnosis{
PreFilterMsg: "",
NodeToStatusMap: NodeToStatusMap{
NodeToStatus: NewNodeToStatus(map[string]*Status{
"node1": NewStatus(Unschedulable, "Node(s) failed Filter plugin FalseFilter-1"),
"node2": NewStatus(Unschedulable, "Node(s) failed Filter plugin FalseFilter-1"),
"node3": NewStatus(Unschedulable, "Node(s) failed Filter plugin FalseFilter-1"),
},
}, NewStatus(UnschedulableAndUnresolvable)),
},
wantReasonMsg: "0/3 nodes are available: 3 Node(s) failed Filter plugin FalseFilter-1.",
},
@ -1439,11 +1439,11 @@ func TestFitError_Error(t *testing.T) {
numAllNodes: 3,
diagnosis: Diagnosis{
PreFilterMsg: "",
NodeToStatusMap: NodeToStatusMap{
NodeToStatus: NewNodeToStatus(map[string]*Status{
"node1": NewStatus(Unschedulable, "Node(s) failed Filter plugin FalseFilter-1"),
"node2": NewStatus(Unschedulable, "Node(s) failed Filter plugin FalseFilter-1"),
"node3": NewStatus(Unschedulable, "Node(s) failed Filter plugin FalseFilter-1"),
},
}, NewStatus(UnschedulableAndUnresolvable)),
PostFilterMsg: "Error running PostFilter plugin FailedPostFilter",
},
wantReasonMsg: "0/3 nodes are available: 3 Node(s) failed Filter plugin FalseFilter-1. Error running PostFilter plugin FailedPostFilter",
@ -1453,11 +1453,11 @@ func TestFitError_Error(t *testing.T) {
numAllNodes: 3,
diagnosis: Diagnosis{
PreFilterMsg: "",
NodeToStatusMap: NodeToStatusMap{
NodeToStatus: NewNodeToStatus(map[string]*Status{
"node1": NewStatus(Unschedulable, "Node(s) failed Filter plugin FalseFilter-1"),
"node2": NewStatus(Unschedulable, "Node(s) failed Filter plugin FalseFilter-1"),
"node3": NewStatus(Unschedulable, "Node(s) failed Filter plugin FalseFilter-2"),
},
}, NewStatus(UnschedulableAndUnresolvable)),
},
wantReasonMsg: "0/3 nodes are available: 1 Node(s) failed Filter plugin FalseFilter-2, 2 Node(s) failed Filter plugin FalseFilter-1.",
},
@ -1466,11 +1466,11 @@ func TestFitError_Error(t *testing.T) {
numAllNodes: 3,
diagnosis: Diagnosis{
PreFilterMsg: "",
NodeToStatusMap: NodeToStatusMap{
NodeToStatus: NewNodeToStatus(map[string]*Status{
"node1": NewStatus(Unschedulable, "Node(s) failed Filter plugin FalseFilter-1"),
"node2": NewStatus(Unschedulable, "Node(s) failed Filter plugin FalseFilter-1"),
"node3": NewStatus(Unschedulable, "Node(s) failed Filter plugin FalseFilter-2"),
},
}, NewStatus(UnschedulableAndUnresolvable)),
PostFilterMsg: "Error running PostFilter plugin FailedPostFilter",
},
wantReasonMsg: "0/3 nodes are available: 1 Node(s) failed Filter plugin FalseFilter-2, 2 Node(s) failed Filter plugin FalseFilter-1. Error running PostFilter plugin FailedPostFilter",
@ -1479,10 +1479,10 @@ func TestFitError_Error(t *testing.T) {
name: "failed to Permit on node",
numAllNodes: 1,
diagnosis: Diagnosis{
NodeToStatusMap: NodeToStatusMap{
NodeToStatus: NewNodeToStatus(map[string]*Status{
// There should be only one node here.
"node1": NewStatus(Unschedulable, "Node failed Permit plugin Permit-1"),
},
}, NewStatus(UnschedulableAndUnresolvable)),
},
wantReasonMsg: "0/1 nodes are available: 1 Node failed Permit plugin Permit-1.",
},
@ -1490,10 +1490,10 @@ func TestFitError_Error(t *testing.T) {
name: "failed to Reserve on node",
numAllNodes: 1,
diagnosis: Diagnosis{
NodeToStatusMap: NodeToStatusMap{
NodeToStatus: NewNodeToStatus(map[string]*Status{
// There should be only one node here.
"node1": NewStatus(Unschedulable, "Node failed Reserve plugin Reserve-1"),
},
}, NewStatus(UnschedulableAndUnresolvable)),
},
wantReasonMsg: "0/1 nodes are available: 1 Node failed Reserve plugin Reserve-1.",
},

View File

@ -173,7 +173,7 @@ func (sched *Scheduler) schedulingCycle(
}
// Run PostFilter plugins to attempt to make the pod schedulable in a future scheduling cycle.
result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatus)
msg := status.Message()
fitError.Diagnosis.PostFilterMsg = msg
if status.Code() == framework.Error {
@ -218,9 +218,10 @@ func (sched *Scheduler) schedulingCycle(
NumAllNodes: 1,
Pod: pod,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{scheduleResult.SuggestedHost: sts},
NodeToStatus: framework.NewDefaultNodeToStatus(),
},
}
fitErr.Diagnosis.NodeToStatus.Set(scheduleResult.SuggestedHost, sts)
fitErr.Diagnosis.AddPluginStatus(sts)
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.NewStatus(sts.Code()).WithError(fitErr)
}
@ -241,9 +242,10 @@ func (sched *Scheduler) schedulingCycle(
NumAllNodes: 1,
Pod: pod,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{scheduleResult.SuggestedHost: runPermitStatus},
NodeToStatus: framework.NewDefaultNodeToStatus(),
},
}
fitErr.Diagnosis.NodeToStatus.Set(scheduleResult.SuggestedHost, runPermitStatus)
fitErr.Diagnosis.AddPluginStatus(runPermitStatus)
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.NewStatus(runPermitStatus.Code()).WithError(fitErr)
}
@ -281,10 +283,11 @@ func (sched *Scheduler) bindingCycle(
NumAllNodes: 1,
Pod: assumedPodInfo.Pod,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{scheduleResult.SuggestedHost: status},
NodeToStatus: framework.NewDefaultNodeToStatus(),
UnschedulablePlugins: sets.New(status.Plugin()),
},
}
fitErr.Diagnosis.NodeToStatus.Set(scheduleResult.SuggestedHost, status)
return framework.NewStatus(status.Code()).WithError(fitErr)
}
return status
@ -297,10 +300,11 @@ func (sched *Scheduler) bindingCycle(
NumAllNodes: 1,
Pod: assumedPodInfo.Pod,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{scheduleResult.SuggestedHost: status},
NodeToStatus: framework.NewDefaultNodeToStatus(),
UnschedulablePlugins: sets.New(status.Plugin()),
},
}
fitErr.Diagnosis.NodeToStatus.Set(scheduleResult.SuggestedHost, status)
return framework.NewStatus(status.Code()).WithError(fitErr)
}
return status
@ -428,7 +432,7 @@ func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework
if len(feasibleNodes) == 1 {
return ScheduleResult{
SuggestedHost: feasibleNodes[0].Node().Name,
EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
EvaluatedNodes: 1 + diagnosis.NodeToStatus.Len(),
FeasibleNodes: 1,
}, nil
}
@ -443,7 +447,7 @@ func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
EvaluatedNodes: len(feasibleNodes) + diagnosis.NodeToStatus.Len(),
FeasibleNodes: len(feasibleNodes),
}, err
}
@ -453,7 +457,7 @@ func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework
func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*framework.NodeInfo, framework.Diagnosis, error) {
logger := klog.FromContext(ctx)
diagnosis := framework.Diagnosis{
NodeToStatusMap: make(framework.NodeToStatusMap),
NodeToStatus: framework.NewDefaultNodeToStatus(),
}
allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()
@ -467,11 +471,8 @@ func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.F
if !s.IsRejected() {
return nil, diagnosis, s.AsError()
}
// All nodes in NodeToStatusMap will have the same status so that they can be handled in the preemption.
// Some non trivial refactoring is needed to avoid this copy.
for _, n := range allNodes {
diagnosis.NodeToStatusMap[n.Node().Name] = s
}
// All nodes in NodeToStatus will have the same status so that they can be handled in the preemption.
diagnosis.NodeToStatus.SetAbsentNodesStatus(s)
// Record the messages from PreFilter in Diagnosis.PreFilterMsg.
msg := s.Message()
@ -504,17 +505,18 @@ func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.F
nodes = append(nodes, nodeInfo)
}
}
diagnosis.NodeToStatus.SetAbsentNodesStatus(framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("node(s) didn't satisfy plugin(s) %v", sets.List(unscheduledPlugins))))
}
feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, &diagnosis, nodes)
// always try to update the sched.nextStartNodeIndex regardless of whether an error has occurred
// this is helpful to make sure that all the nodes have a chance to be searched
processedNodes := len(feasibleNodes) + len(diagnosis.NodeToStatusMap)
processedNodes := len(feasibleNodes) + diagnosis.NodeToStatus.Len()
sched.nextStartNodeIndex = (sched.nextStartNodeIndex + processedNodes) % len(allNodes)
if err != nil {
return nil, diagnosis, err
}
feasibleNodesAfterExtender, err := findNodesThatPassExtenders(ctx, sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
feasibleNodesAfterExtender, err := findNodesThatPassExtenders(ctx, sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatus)
if err != nil {
return nil, diagnosis, err
}
@ -548,7 +550,7 @@ func (sched *Scheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Pod,
return nil, err
}
feasibleNodes, err = findNodesThatPassExtenders(ctx, sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
feasibleNodes, err = findNodesThatPassExtenders(ctx, sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatus)
if err != nil {
return nil, err
}
@ -653,7 +655,7 @@ func (sched *Scheduler) findNodesThatPassFilters(
if item == nil {
continue
}
diagnosis.NodeToStatusMap[item.node] = item.status
diagnosis.NodeToStatus.Set(item.node, item.status)
diagnosis.AddPluginStatus(item.status)
}
if err := errCh.ReceiveError(); err != nil {
@ -693,8 +695,9 @@ func (sched *Scheduler) numFeasibleNodesToFind(percentageOfNodesToScore *int32,
return numNodes
}
func findNodesThatPassExtenders(ctx context.Context, extenders []framework.Extender, pod *v1.Pod, feasibleNodes []*framework.NodeInfo, statuses framework.NodeToStatusMap) ([]*framework.NodeInfo, error) {
func findNodesThatPassExtenders(ctx context.Context, extenders []framework.Extender, pod *v1.Pod, feasibleNodes []*framework.NodeInfo, statuses *framework.NodeToStatus) ([]*framework.NodeInfo, error) {
logger := klog.FromContext(ctx)
// Extenders are called sequentially.
// Nodes in original feasibleNodes can be excluded in one extender, and pass on to the next
// extender in a decreasing manner.
@ -706,7 +709,7 @@ func findNodesThatPassExtenders(ctx context.Context, extenders []framework.Exten
continue
}
// Status of failed nodes in failedAndUnresolvableMap will be added or overwritten in <statuses>,
// Status of failed nodes in failedAndUnresolvableMap will be added to <statuses>,
// so that the scheduler framework can respect the UnschedulableAndUnresolvable status for
// particular nodes, and this may eventually improve preemption efficiency.
// Note: users are recommended to configure the extenders that may return UnschedulableAndUnresolvable
@ -721,12 +724,7 @@ func findNodesThatPassExtenders(ctx context.Context, extenders []framework.Exten
}
for failedNodeName, failedMsg := range failedAndUnresolvableMap {
var aggregatedReasons []string
if _, found := statuses[failedNodeName]; found {
aggregatedReasons = statuses[failedNodeName].Reasons()
}
aggregatedReasons = append(aggregatedReasons, failedMsg)
statuses[failedNodeName] = framework.NewStatus(framework.UnschedulableAndUnresolvable, aggregatedReasons...)
statuses.Set(failedNodeName, framework.NewStatus(framework.UnschedulableAndUnresolvable, failedMsg))
}
for failedNodeName, failedMsg := range failedMap {
@ -735,11 +733,7 @@ func findNodesThatPassExtenders(ctx context.Context, extenders []framework.Exten
// note that this only happens if the extender returns the node in both maps
continue
}
if _, found := statuses[failedNodeName]; !found {
statuses[failedNodeName] = framework.NewStatus(framework.Unschedulable, failedMsg)
} else {
statuses[failedNodeName].AppendReason(failedMsg)
}
statuses.Set(failedNodeName, framework.NewStatus(framework.Unschedulable, failedMsg))
}
feasibleNodes = feasibleList

View File

@ -32,6 +32,7 @@ import (
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
v1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
"k8s.io/apimachinery/pkg/api/resource"
@ -377,6 +378,13 @@ func (t *TestPlugin) Filter(ctx context.Context, state *framework.CycleState, po
return nil
}
func nodeToStatusDiff(want, got *framework.NodeToStatus) string {
if want == nil || got == nil {
return cmp.Diff(want, got)
}
return cmp.Diff(*want, *got, cmp.AllowUnexported(framework.NodeToStatus{}))
}
func TestSchedulerMultipleProfilesScheduling(t *testing.T) {
nodes := []runtime.Object{
st.MakeNode().Name("node1").UID("node1").Obj(),
@ -933,9 +941,9 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
Pod: secondPod,
NumAllNodes: 1,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
NodeToStatus: framework.NewNodeToStatus(map[string]*framework.Status{
node.Name: framework.NewStatus(framework.Unschedulable, nodeports.ErrReason).WithPlugin(nodeports.Name),
},
}, framework.NewStatus(framework.UnschedulableAndUnresolvable)),
UnschedulablePlugins: sets.New(nodeports.Name),
},
}
@ -1017,13 +1025,13 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
}
// Create expected failure reasons for all the nodes. Hopefully they will get rolled up into a non-spammy summary.
failedNodeStatues := framework.NodeToStatusMap{}
failedNodeStatues := framework.NewDefaultNodeToStatus()
for _, node := range nodes {
failedNodeStatues[node.Name] = framework.NewStatus(
failedNodeStatues.Set(node.Name, framework.NewStatus(
framework.Unschedulable,
fmt.Sprintf("Insufficient %v", v1.ResourceCPU),
fmt.Sprintf("Insufficient %v", v1.ResourceMemory),
).WithPlugin(noderesources.Name)
).WithPlugin(noderesources.Name))
}
fns := []tf.RegisterPluginFunc{
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
@ -1042,7 +1050,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
Pod: podWithTooBigResourceRequests,
NumAllNodes: len(nodes),
Diagnosis: framework.Diagnosis{
NodeToStatusMap: failedNodeStatues,
NodeToStatus: failedNodeStatues,
UnschedulablePlugins: sets.New(noderesources.Name),
},
}
@ -1616,14 +1624,16 @@ func Test_SelectHost(t *testing.T) {
}
func TestFindNodesThatPassExtenders(t *testing.T) {
absentStatus := framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [PreFilter]")
tests := []struct {
name string
extenders []tf.FakeExtender
nodes []*v1.Node
filteredNodesStatuses framework.NodeToStatusMap
filteredNodesStatuses *framework.NodeToStatus
expectsErr bool
expectedNodes []*v1.Node
expectedStatuses framework.NodeToStatusMap
expectedStatuses *framework.NodeToStatus
}{
{
name: "error",
@ -1634,7 +1644,7 @@ func TestFindNodesThatPassExtenders(t *testing.T) {
},
},
nodes: makeNodeList([]string{"a"}),
filteredNodesStatuses: make(framework.NodeToStatusMap),
filteredNodesStatuses: framework.NewNodeToStatus(make(map[string]*framework.Status), absentStatus),
expectsErr: true,
},
{
@ -1646,10 +1656,10 @@ func TestFindNodesThatPassExtenders(t *testing.T) {
},
},
nodes: makeNodeList([]string{"a"}),
filteredNodesStatuses: make(framework.NodeToStatusMap),
filteredNodesStatuses: framework.NewNodeToStatus(make(map[string]*framework.Status), absentStatus),
expectsErr: false,
expectedNodes: makeNodeList([]string{"a"}),
expectedStatuses: make(framework.NodeToStatusMap),
expectedStatuses: framework.NewNodeToStatus(make(map[string]*framework.Status), absentStatus),
},
{
name: "unschedulable",
@ -1665,12 +1675,12 @@ func TestFindNodesThatPassExtenders(t *testing.T) {
},
},
nodes: makeNodeList([]string{"a", "b"}),
filteredNodesStatuses: make(framework.NodeToStatusMap),
filteredNodesStatuses: framework.NewNodeToStatus(make(map[string]*framework.Status), absentStatus),
expectsErr: false,
expectedNodes: makeNodeList([]string{"a"}),
expectedStatuses: framework.NodeToStatusMap{
expectedStatuses: framework.NewNodeToStatus(map[string]*framework.Status{
"b": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("FakeExtender: node %q failed", "b")),
},
}, absentStatus),
},
{
name: "unschedulable and unresolvable",
@ -1689,16 +1699,16 @@ func TestFindNodesThatPassExtenders(t *testing.T) {
},
},
nodes: makeNodeList([]string{"a", "b", "c"}),
filteredNodesStatuses: make(framework.NodeToStatusMap),
filteredNodesStatuses: framework.NewNodeToStatus(make(map[string]*framework.Status), absentStatus),
expectsErr: false,
expectedNodes: makeNodeList([]string{"a"}),
expectedStatuses: framework.NodeToStatusMap{
expectedStatuses: framework.NewNodeToStatus(map[string]*framework.Status{
"b": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("FakeExtender: node %q failed", "b")),
"c": framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("FakeExtender: node %q failed and unresolvable", "c")),
},
}, absentStatus),
},
{
name: "extender may overwrite the statuses",
name: "extender does not overwrite the previous statuses",
extenders: []tf.FakeExtender{
{
ExtenderName: "FakeExtender1",
@ -1713,16 +1723,16 @@ func TestFindNodesThatPassExtenders(t *testing.T) {
}},
},
},
nodes: makeNodeList([]string{"a", "b", "c"}),
filteredNodesStatuses: framework.NodeToStatusMap{
nodes: makeNodeList([]string{"a", "b"}),
filteredNodesStatuses: framework.NewNodeToStatus(map[string]*framework.Status{
"c": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("FakeFilterPlugin: node %q failed", "c")),
},
}, absentStatus),
expectsErr: false,
expectedNodes: makeNodeList([]string{"a"}),
expectedStatuses: framework.NodeToStatusMap{
expectedStatuses: framework.NewNodeToStatus(map[string]*framework.Status{
"b": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("FakeExtender: node %q failed", "b")),
"c": framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("FakeFilterPlugin: node %q failed", "c"), fmt.Sprintf("FakeExtender: node %q failed and unresolvable", "c")),
},
"c": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("FakeFilterPlugin: node %q failed", "c")),
}, absentStatus),
},
{
name: "multiple extenders",
@ -1750,22 +1760,16 @@ func TestFindNodesThatPassExtenders(t *testing.T) {
},
},
nodes: makeNodeList([]string{"a", "b", "c"}),
filteredNodesStatuses: make(framework.NodeToStatusMap),
filteredNodesStatuses: framework.NewNodeToStatus(make(map[string]*framework.Status), absentStatus),
expectsErr: false,
expectedNodes: makeNodeList([]string{"a"}),
expectedStatuses: framework.NodeToStatusMap{
expectedStatuses: framework.NewNodeToStatus(map[string]*framework.Status{
"b": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("FakeExtender: node %q failed", "b")),
"c": framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("FakeExtender: node %q failed and unresolvable", "c")),
},
}, absentStatus),
},
}
cmpOpts := []cmp.Option{
cmp.Comparer(func(s1 framework.Status, s2 framework.Status) bool {
return s1.Code() == s2.Code() && reflect.DeepEqual(s1.Reasons(), s2.Reasons())
}),
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
@ -1791,7 +1795,7 @@ func TestFindNodesThatPassExtenders(t *testing.T) {
if diff := cmp.Diff(tt.expectedNodes, nodes); diff != "" {
t.Errorf("filtered nodes (-want,+got):\n%s", diff)
}
if diff := cmp.Diff(tt.expectedStatuses, tt.filteredNodesStatuses, cmpOpts...); diff != "" {
if diff := nodeToStatusDiff(tt.expectedStatuses, tt.filteredNodesStatuses); diff != "" {
t.Errorf("filtered statuses (-want,+got):\n%s", diff)
}
}
@ -1826,10 +1830,10 @@ func TestSchedulerSchedulePod(t *testing.T) {
Pod: st.MakePod().Name("2").UID("2").Obj(),
NumAllNodes: 2,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
NodeToStatus: framework.NewNodeToStatus(map[string]*framework.Status{
"node1": framework.NewStatus(framework.Unschedulable, tf.ErrReasonFake).WithPlugin("FalseFilter"),
"node2": framework.NewStatus(framework.Unschedulable, tf.ErrReasonFake).WithPlugin("FalseFilter"),
},
}, framework.NewStatus(framework.UnschedulableAndUnresolvable)),
UnschedulablePlugins: sets.New("FalseFilter"),
},
},
@ -1915,11 +1919,11 @@ func TestSchedulerSchedulePod(t *testing.T) {
Pod: st.MakePod().Name("2").UID("2").Obj(),
NumAllNodes: 3,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
NodeToStatus: framework.NewNodeToStatus(map[string]*framework.Status{
"3": framework.NewStatus(framework.Unschedulable, tf.ErrReasonFake).WithPlugin("FalseFilter"),
"2": framework.NewStatus(framework.Unschedulable, tf.ErrReasonFake).WithPlugin("FalseFilter"),
"1": framework.NewStatus(framework.Unschedulable, tf.ErrReasonFake).WithPlugin("FalseFilter"),
},
}, framework.NewStatus(framework.UnschedulableAndUnresolvable)),
UnschedulablePlugins: sets.New("FalseFilter"),
},
},
@ -1942,10 +1946,10 @@ func TestSchedulerSchedulePod(t *testing.T) {
Pod: st.MakePod().Name("2").UID("2").Obj(),
NumAllNodes: 2,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
NodeToStatus: framework.NewNodeToStatus(map[string]*framework.Status{
"1": framework.NewStatus(framework.Unschedulable, tf.ErrReasonFake).WithPlugin("MatchFilter"),
"2": framework.NewStatus(framework.Unschedulable, tf.ErrReasonFake).WithPlugin("NoPodsFilter"),
},
}, framework.NewStatus(framework.UnschedulableAndUnresolvable)),
UnschedulablePlugins: sets.New("MatchFilter", "NoPodsFilter"),
},
},
@ -1986,10 +1990,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
Pod: st.MakePod().Name("ignore").UID("ignore").PVC("unknownPVC").Obj(),
NumAllNodes: 2,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, `persistentvolumeclaim "unknownPVC" not found`).WithPlugin("VolumeBinding"),
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, `persistentvolumeclaim "unknownPVC" not found`).WithPlugin("VolumeBinding"),
},
NodeToStatus: framework.NewNodeToStatus(make(map[string]*framework.Status), framework.NewStatus(framework.UnschedulableAndUnresolvable, `persistentvolumeclaim "unknownPVC" not found`).WithPlugin("VolumeBinding")),
PreFilterMsg: `persistentvolumeclaim "unknownPVC" not found`,
UnschedulablePlugins: sets.New(volumebinding.Name),
},
@ -2011,10 +2012,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
Pod: st.MakePod().Name("ignore").UID("ignore").Namespace(v1.NamespaceDefault).PVC("existingPVC").Obj(),
NumAllNodes: 2,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, `persistentvolumeclaim "existingPVC" is being deleted`).WithPlugin("VolumeBinding"),
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, `persistentvolumeclaim "existingPVC" is being deleted`).WithPlugin("VolumeBinding"),
},
NodeToStatus: framework.NewNodeToStatus(make(map[string]*framework.Status), framework.NewStatus(framework.UnschedulableAndUnresolvable, `persistentvolumeclaim "existingPVC" is being deleted`).WithPlugin("VolumeBinding")),
PreFilterMsg: `persistentvolumeclaim "existingPVC" is being deleted`,
UnschedulablePlugins: sets.New(volumebinding.Name),
},
@ -2108,9 +2106,9 @@ func TestSchedulerSchedulePod(t *testing.T) {
Pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(),
NumAllNodes: 1,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
NodeToStatus: framework.NewNodeToStatus(map[string]*framework.Status{
"3": framework.NewStatus(framework.Unschedulable, "injecting failure for pod test-filter").WithPlugin("FakeFilter"),
},
}, framework.NewStatus(framework.UnschedulableAndUnresolvable)),
UnschedulablePlugins: sets.New("FakeFilter"),
},
},
@ -2139,11 +2137,11 @@ func TestSchedulerSchedulePod(t *testing.T) {
Pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(),
NumAllNodes: 3,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
NodeToStatus: framework.NewNodeToStatus(map[string]*framework.Status{
"1": framework.NewStatus(framework.Unschedulable, `FakeExtender: node "1" failed`),
"2": framework.NewStatus(framework.Unschedulable, `FakeExtender: node "2" failed`),
"3": framework.NewStatus(framework.Unschedulable, "injecting failure for pod test-filter").WithPlugin("FakeFilter"),
},
}, framework.NewStatus(framework.UnschedulableAndUnresolvable)),
UnschedulablePlugins: sets.New("FakeFilter", framework.ExtenderName),
},
},
@ -2166,9 +2164,9 @@ func TestSchedulerSchedulePod(t *testing.T) {
Pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(),
NumAllNodes: 1,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
NodeToStatus: framework.NewNodeToStatus(map[string]*framework.Status{
"3": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injecting failure for pod test-filter").WithPlugin("FakeFilter"),
},
}, framework.NewStatus(framework.UnschedulableAndUnresolvable)),
UnschedulablePlugins: sets.New("FakeFilter"),
},
},
@ -2206,10 +2204,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
Pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
NumAllNodes: 2,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
"1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status").WithPlugin("FakePreFilter"),
"2": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status").WithPlugin("FakePreFilter"),
},
NodeToStatus: framework.NewNodeToStatus(make(map[string]*framework.Status), framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status").WithPlugin("FakePreFilter")),
PreFilterMsg: "injected unschedulable status",
UnschedulablePlugins: sets.New("FakePreFilter"),
},
@ -2278,11 +2273,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
Pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
NumAllNodes: 3,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
"node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
},
NodeToStatus: framework.NewNodeToStatus(make(map[string]*framework.Status), framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously")),
UnschedulablePlugins: sets.New("FakePreFilter2", "FakePreFilter3"),
PreFilterMsg: "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously",
},
@ -2308,9 +2299,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
Pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
NumAllNodes: 1,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin FakePreFilter2"),
},
NodeToStatus: framework.NewNodeToStatus(make(map[string]*framework.Status), framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin FakePreFilter2")),
UnschedulablePlugins: sets.New("FakePreFilter2"),
PreFilterMsg: "node(s) didn't satisfy plugin FakePreFilter2",
},
@ -2336,9 +2325,9 @@ func TestSchedulerSchedulePod(t *testing.T) {
Pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
NumAllNodes: 2,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
NodeToStatus: framework.NewNodeToStatus(map[string]*framework.Status{
"node2": framework.NewStatus(framework.Unschedulable, "injecting failure for pod test-prefilter").WithPlugin("FakeFilter"),
},
}, framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter]")),
UnschedulablePlugins: sets.New("FakePreFilter", "FakeFilter"),
PreFilterMsg: "",
},
@ -2444,7 +2433,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
Pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
NumAllNodes: 2,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{},
NodeToStatus: framework.NewNodeToStatus(make(map[string]*framework.Status), framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter]")),
UnschedulablePlugins: sets.New("FakePreFilter"),
},
},
@ -2511,8 +2500,11 @@ func TestSchedulerSchedulePod(t *testing.T) {
if gotOK != wantOK {
t.Errorf("Expected err to be FitError: %v, but got %v (error: %v)", wantOK, gotOK, err)
} else if gotOK {
if diff := cmp.Diff(wantFitErr, gotFitErr); diff != "" {
t.Errorf("Unexpected fitErr: (-want, +got): %s", diff)
if diff := cmp.Diff(wantFitErr, gotFitErr, cmpopts.IgnoreFields(framework.Diagnosis{}, "NodeToStatus")); diff != "" {
t.Errorf("Unexpected fitErr for map: (-want, +got): %s", diff)
}
if diff := nodeToStatusDiff(wantFitErr.Diagnosis.NodeToStatus, gotFitErr.Diagnosis.NodeToStatus); diff != "" {
t.Errorf("Unexpected nodeToStatus within fitErr for map: (-want, +got): %s", diff)
}
}
}
@ -2558,16 +2550,19 @@ func TestFindFitAllError(t *testing.T) {
}
expected := framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
NodeToStatus: framework.NewNodeToStatus(map[string]*framework.Status{
"1": framework.NewStatus(framework.Unschedulable, tf.ErrReasonFake).WithPlugin("MatchFilter"),
"2": framework.NewStatus(framework.Unschedulable, tf.ErrReasonFake).WithPlugin("MatchFilter"),
"3": framework.NewStatus(framework.Unschedulable, tf.ErrReasonFake).WithPlugin("MatchFilter"),
},
}, framework.NewStatus(framework.UnschedulableAndUnresolvable)),
UnschedulablePlugins: sets.New("MatchFilter"),
}
if diff := cmp.Diff(diagnosis, expected); diff != "" {
if diff := cmp.Diff(diagnosis, expected, cmpopts.IgnoreFields(framework.Diagnosis{}, "NodeToStatus")); diff != "" {
t.Errorf("Unexpected diagnosis: (-want, +got): %s", diff)
}
if diff := nodeToStatusDiff(diagnosis.NodeToStatus, expected.NodeToStatus); diff != "" {
t.Errorf("Unexpected nodeToStatus within diagnosis: (-want, +got): %s", diff)
}
}
func TestFindFitSomeError(t *testing.T) {
@ -2598,8 +2593,8 @@ func TestFindFitSomeError(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
if len(diagnosis.NodeToStatusMap) != len(nodes)-1 {
t.Errorf("unexpected failed status map: %v", diagnosis.NodeToStatusMap)
if diagnosis.NodeToStatus.Len() != len(nodes)-1 {
t.Errorf("unexpected failed status map: %v", diagnosis.NodeToStatus)
}
if diff := cmp.Diff(sets.New("MatchFilter"), diagnosis.UnschedulablePlugins); diff != "" {
@ -2611,10 +2606,7 @@ func TestFindFitSomeError(t *testing.T) {
continue
}
t.Run(node.Name, func(t *testing.T) {
status, found := diagnosis.NodeToStatusMap[node.Name]
if !found {
t.Errorf("failed to find node %v in %v", node.Name, diagnosis.NodeToStatusMap)
}
status := diagnosis.NodeToStatus.Get(node.Name)
reasons := status.Reasons()
if len(reasons) != 1 || reasons[0] != tf.ErrReasonFake {
t.Errorf("unexpected failures: %v", reasons)

View File

@ -537,7 +537,7 @@ func (pp *PostFilterPlugin) Name() string {
return pp.name
}
func (pp *PostFilterPlugin) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, _ framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
func (pp *PostFilterPlugin) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, _ framework.NodeToStatusReader) (*framework.PostFilterResult, *framework.Status) {
pp.numPostFilterCalled++
nodeInfos, err := pp.fh.SnapshotSharedLister().NodeInfos().List()
if err != nil {