Merge pull request #92866 from cofyc/fix91281

scheduler: Extend ExtenderFilterResult to include UnschedulableAndUnresolvable nodes
This commit is contained in:
Kubernetes Prow Robot 2021-02-01 23:22:27 -08:00 committed by GitHub
commit 8096513aca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 286 additions and 67 deletions

View File

@ -269,11 +269,13 @@ func convertToNodeNameToMetaVictims(
// Filter based on extender implemented predicate functions. The filtered list is
// expected to be a subset of the supplied list; otherwise the function returns an error.
// failedNodesMap optionally contains the list of failed nodes and failure reasons.
// The failedNodes and failedAndUnresolvableNodes optionally contains the list
// of failed nodes and failure reasons, except nodes in the latter are
// unresolvable.
func (h *HTTPExtender) Filter(
pod *v1.Pod,
nodes []*v1.Node,
) ([]*v1.Node, extenderv1.FailedNodesMap, error) {
) (filteredList []*v1.Node, failedNodes, failedAndUnresolvableNodes extenderv1.FailedNodesMap, err error) {
var (
result extenderv1.ExtenderFilterResult
nodeList *v1.NodeList
@ -287,7 +289,7 @@ func (h *HTTPExtender) Filter(
}
if h.filterVerb == "" {
return nodes, extenderv1.FailedNodesMap{}, nil
return nodes, extenderv1.FailedNodesMap{}, extenderv1.FailedNodesMap{}, nil
}
if h.nodeCacheCapable {
@ -310,10 +312,10 @@ func (h *HTTPExtender) Filter(
}
if err := h.send(h.filterVerb, args, &result); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if result.Error != "" {
return nil, nil, fmt.Errorf(result.Error)
return nil, nil, nil, fmt.Errorf(result.Error)
}
if h.nodeCacheCapable && result.NodeNames != nil {
@ -322,7 +324,7 @@ func (h *HTTPExtender) Filter(
if n, ok := fromNodeName[nodeName]; ok {
nodeResult[i] = n
} else {
return nil, nil, fmt.Errorf(
return nil, nil, nil, fmt.Errorf(
"extender %q claims a filtered node %q which is not found in the input node list",
h.extenderURL, nodeName)
}
@ -334,7 +336,7 @@ func (h *HTTPExtender) Filter(
}
}
return nodeResult, result.FailedNodes, nil
return nodeResult, result.FailedNodes, result.FailedAndUnresolvableNodes, nil
}
// Prioritize based on extender implemented priority functions. Weight*priority is added

View File

@ -345,6 +345,9 @@ func (g *genericScheduler) findNodesThatPassFilters(
}
func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, feasibleNodes []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
// Extenders are called sequentially.
// Nodes in original feasibleNodes can be excluded in one extender, and pass on to the next
// extender in a decreasing manner.
for _, extender := range g.extenders {
if len(feasibleNodes) == 0 {
break
@ -352,7 +355,13 @@ func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, feasibleNodes
if !extender.IsInterested(pod) {
continue
}
feasibleList, failedMap, err := extender.Filter(pod, feasibleNodes)
// Status of failed nodes in failedAndUnresolvableMap will be added or overwritten in <statuses>,
// so that the scheduler framework can respect the UnschedulableAndUnresolvable status for
// particular nodes, and this may eventually improve preemption efficiency.
// Note: users are recommended to configure the extenders that may return UnschedulableAndUnresolvable
// status ahead of others.
feasibleList, failedMap, failedAndUnresolvableMap, err := extender.Filter(pod, feasibleNodes)
if err != nil {
if extender.IsIgnorable() {
klog.InfoS("Skipping extender as it returned error and has ignorable flag set", "extender", extender, "err", err)
@ -361,13 +370,28 @@ func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, feasibleNodes
return nil, err
}
for failedNodeName, failedMsg := range failedAndUnresolvableMap {
var aggregatedReasons []string
if _, found := statuses[failedNodeName]; found {
aggregatedReasons = statuses[failedNodeName].Reasons()
}
aggregatedReasons = append(aggregatedReasons, failedMsg)
statuses[failedNodeName] = framework.NewStatus(framework.UnschedulableAndUnresolvable, aggregatedReasons...)
}
for failedNodeName, failedMsg := range failedMap {
if _, found := failedAndUnresolvableMap[failedNodeName]; found {
// failedAndUnresolvableMap takes precedence over failedMap
// note that this only happens if the extender returns the node in both maps
continue
}
if _, found := statuses[failedNodeName]; !found {
statuses[failedNodeName] = framework.NewStatus(framework.Unschedulable, failedMsg)
} else {
statuses[failedNodeName].AppendReason(failedMsg)
}
}
feasibleNodes = feasibleList
}
return feasibleNodes, nil

View File

@ -273,6 +273,180 @@ func TestSelectHost(t *testing.T) {
}
}
func TestFindNodesThatPassExtenders(t *testing.T) {
tests := []struct {
name string
extenders []st.FakeExtender
nodes []*v1.Node
filteredNodesStatuses framework.NodeToStatusMap
expectsErr bool
expectedNodes []*v1.Node
expectedStatuses framework.NodeToStatusMap
}{
{
name: "error",
extenders: []st.FakeExtender{
{
Predicates: []st.FitPredicate{st.ErrorPredicateExtender},
},
},
nodes: makeNodeList([]string{"a"}),
filteredNodesStatuses: make(framework.NodeToStatusMap),
expectsErr: true,
},
{
name: "success",
extenders: []st.FakeExtender{
{
Predicates: []st.FitPredicate{st.TruePredicateExtender},
},
},
nodes: makeNodeList([]string{"a"}),
filteredNodesStatuses: make(framework.NodeToStatusMap),
expectsErr: false,
expectedNodes: makeNodeList([]string{"a"}),
expectedStatuses: make(framework.NodeToStatusMap),
},
{
name: "unschedulable",
extenders: []st.FakeExtender{
{
Predicates: []st.FitPredicate{func(pod *v1.Pod, node *v1.Node) *framework.Status {
if node.Name == "a" {
return framework.NewStatus(framework.Success)
}
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Name))
}},
},
},
nodes: makeNodeList([]string{"a", "b"}),
filteredNodesStatuses: make(framework.NodeToStatusMap),
expectsErr: false,
expectedNodes: makeNodeList([]string{"a"}),
expectedStatuses: framework.NodeToStatusMap{
"b": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("FakeExtender: node %q failed", "b")),
},
},
{
name: "unschedulable and unresolvable",
extenders: []st.FakeExtender{
{
Predicates: []st.FitPredicate{func(pod *v1.Pod, node *v1.Node) *framework.Status {
if node.Name == "a" {
return framework.NewStatus(framework.Success)
}
if node.Name == "b" {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Name))
}
return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("node %q is not allowed", node.Name))
}},
},
},
nodes: makeNodeList([]string{"a", "b", "c"}),
filteredNodesStatuses: make(framework.NodeToStatusMap),
expectsErr: false,
expectedNodes: makeNodeList([]string{"a"}),
expectedStatuses: framework.NodeToStatusMap{
"b": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("FakeExtender: node %q failed", "b")),
"c": framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("FakeExtender: node %q failed and unresolvable", "c")),
},
},
{
name: "extender may overwrite the statuses",
extenders: []st.FakeExtender{
{
Predicates: []st.FitPredicate{func(pod *v1.Pod, node *v1.Node) *framework.Status {
if node.Name == "a" {
return framework.NewStatus(framework.Success)
}
if node.Name == "b" {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Name))
}
return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("node %q is not allowed", node.Name))
}},
},
},
nodes: makeNodeList([]string{"a", "b", "c"}),
filteredNodesStatuses: framework.NodeToStatusMap{
"c": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("FakeFilterPlugin: node %q failed", "c")),
},
expectsErr: false,
expectedNodes: makeNodeList([]string{"a"}),
expectedStatuses: framework.NodeToStatusMap{
"b": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("FakeExtender: node %q failed", "b")),
"c": framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("FakeFilterPlugin: node %q failed", "c"), fmt.Sprintf("FakeExtender: node %q failed and unresolvable", "c")),
},
},
{
name: "multiple extenders",
extenders: []st.FakeExtender{
{
Predicates: []st.FitPredicate{func(pod *v1.Pod, node *v1.Node) *framework.Status {
if node.Name == "a" {
return framework.NewStatus(framework.Success)
}
if node.Name == "b" {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Name))
}
return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("node %q is not allowed", node.Name))
}},
},
{
Predicates: []st.FitPredicate{func(pod *v1.Pod, node *v1.Node) *framework.Status {
if node.Name == "a" {
return framework.NewStatus(framework.Success)
}
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Name))
}},
},
},
nodes: makeNodeList([]string{"a", "b", "c"}),
filteredNodesStatuses: make(framework.NodeToStatusMap),
expectsErr: false,
expectedNodes: makeNodeList([]string{"a"}),
expectedStatuses: framework.NodeToStatusMap{
"b": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("FakeExtender: node %q failed", "b")),
"c": framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("FakeExtender: node %q failed and unresolvable", "c")),
},
},
}
cmpOpts := []cmp.Option{
cmp.Comparer(func(s1 framework.Status, s2 framework.Status) bool {
return s1.Code() == s2.Code() && reflect.DeepEqual(s1.Reasons(), s2.Reasons())
}),
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var extenders []framework.Extender
for ii := range tt.extenders {
extenders = append(extenders, &tt.extenders[ii])
}
scheduler := &genericScheduler{
extenders: extenders,
}
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
got, err := scheduler.findNodesThatPassExtenders(pod, tt.nodes, tt.filteredNodesStatuses)
if tt.expectsErr {
if err == nil {
t.Error("Unexpected non-error")
}
} else {
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if diff := cmp.Diff(tt.expectedNodes, got); diff != "" {
t.Errorf("filtered nodes (-want,+got):\n%s", diff)
}
if diff := cmp.Diff(tt.expectedStatuses, tt.filteredNodesStatuses, cmpOpts...); diff != "" {
t.Errorf("filtered statuses (-want,+got):\n%s", diff)
}
}
})
}
}
func TestGenericScheduler(t *testing.T) {
tests := []struct {
name string

View File

@ -674,8 +674,8 @@ func (f *fakeExtender) SupportsPreemption() bool {
return false
}
func (f *fakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error) {
return nil, nil, nil
func (f *fakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, extenderv1.FailedNodesMap, extenderv1.FailedNodesMap, error) {
return nil, nil, nil, nil
}
func (f *fakeExtender) Prioritize(

View File

@ -29,9 +29,11 @@ type Extender interface {
Name() string
// Filter based on extender-implemented predicate functions. The filtered list is
// expected to be a subset of the supplied list. failedNodesMap optionally contains
// the list of failed nodes and failure reasons.
Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error)
// expected to be a subset of the supplied list.
// The failedNodes and failedAndUnresolvableNodes optionally contains the list
// of failed nodes and failure reasons, except nodes in the latter are
// unresolvable.
Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, failedAndUnresolvable extenderv1.FailedNodesMap, err error)
// Prioritize based on extender-implemented priority functions. The returned scores & weight
// are used to compute the weighted score for an extender. The weighted scores are added to

View File

@ -31,7 +31,7 @@ import (
)
// FitPredicate is a function type which is used in fake extender.
type FitPredicate func(pod *v1.Pod, node *v1.Node) (bool, error)
type FitPredicate func(pod *v1.Pod, node *v1.Node) *framework.Status
// PriorityFunc is a function type which is used in fake extender.
type PriorityFunc func(pod *v1.Pod, nodes []*v1.Node) (*framework.NodeScoreList, error)
@ -42,37 +42,42 @@ type PriorityConfig struct {
Weight int64
}
// ErrorPredicateExtender implements FitPredicate function to always return error.
func ErrorPredicateExtender(pod *v1.Pod, node *v1.Node) (bool, error) {
return false, fmt.Errorf("some error")
// ErrorPredicateExtender implements FitPredicate function to always return error status.
func ErrorPredicateExtender(pod *v1.Pod, node *v1.Node) *framework.Status {
return framework.NewStatus(framework.Error, "some error")
}
// FalsePredicateExtender implements FitPredicate function to always return false.
func FalsePredicateExtender(pod *v1.Pod, node *v1.Node) (bool, error) {
return false, nil
// FalsePredicateExtender implements FitPredicate function to always return unschedulable status.
func FalsePredicateExtender(pod *v1.Pod, node *v1.Node) *framework.Status {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("pod is unschedulable on the node %q", node.Name))
}
// TruePredicateExtender implements FitPredicate function to always return true.
func TruePredicateExtender(pod *v1.Pod, node *v1.Node) (bool, error) {
return true, nil
// FalseAndUnresolvePredicateExtender implements fitPredicate to always return unschedulable and unresolvable status.
func FalseAndUnresolvePredicateExtender(pod *v1.Pod, node *v1.Node) *framework.Status {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("pod is unschedulable and unresolvable on the node %q", node.Name))
}
// TruePredicateExtender implements FitPredicate function to always return success status.
func TruePredicateExtender(pod *v1.Pod, node *v1.Node) *framework.Status {
return framework.NewStatus(framework.Success)
}
// Node1PredicateExtender implements FitPredicate function to return true
// when the given node's name is "node1"; otherwise return false.
func Node1PredicateExtender(pod *v1.Pod, node *v1.Node) (bool, error) {
func Node1PredicateExtender(pod *v1.Pod, node *v1.Node) *framework.Status {
if node.Name == "node1" {
return true, nil
return framework.NewStatus(framework.Success)
}
return false, nil
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Name))
}
// Node2PredicateExtender implements FitPredicate function to return true
// when the given node's name is "node2"; otherwise return false.
func Node2PredicateExtender(pod *v1.Pod, node *v1.Node) (bool, error) {
func Node2PredicateExtender(pod *v1.Pod, node *v1.Node) *framework.Status {
if node.Name == "node2" {
return true, nil
return framework.NewStatus(framework.Success)
}
return false, nil
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Name))
}
// ErrorPrioritizerExtender implements PriorityFunc function to always return error.
@ -211,14 +216,14 @@ func (f *FakeExtender) selectVictimsOnNodeByExtender(pod *v1.Pod, node *v1.Node)
// If a extender support preemption but have no cached node info, let's run filter to make sure
// default scheduler's decision still stand with given pod and node.
if !f.NodeCacheCapable {
fits, err := f.runPredicate(pod, node)
if err != nil {
return nil, 0, false, err
}
if !fits {
err := f.runPredicate(pod, node)
if err.IsSuccess() {
return []*v1.Pod{}, 0, true, nil
} else if err.IsUnschedulable() {
return nil, 0, false, nil
} else {
return nil, 0, false, err.AsError()
}
return []*v1.Pod{}, 0, true, nil
}
// Otherwise, as a extender support preemption and have cached node info, we will assume cachedNodeNameToInfo is available
@ -246,12 +251,15 @@ func (f *FakeExtender) selectVictimsOnNodeByExtender(pod *v1.Pod, node *v1.Node)
// If the new pod does not fit after removing all the lower priority pods,
// we are almost done and this node is not suitable for preemption.
fits, err := f.runPredicate(pod, nodeInfoCopy.Node())
if err != nil {
return nil, 0, false, err
}
if !fits {
status := f.runPredicate(pod, nodeInfoCopy.Node())
if status.IsSuccess() {
// pass
} else if status.IsUnschedulable() {
// does not fit
return nil, 0, false, nil
} else {
// internal errors
return nil, 0, false, status.AsError()
}
var victims []*v1.Pod
@ -261,12 +269,12 @@ func (f *FakeExtender) selectVictimsOnNodeByExtender(pod *v1.Pod, node *v1.Node)
reprievePod := func(p *v1.Pod) bool {
addPod(p)
fits, _ := f.runPredicate(pod, nodeInfoCopy.Node())
if !fits {
status := f.runPredicate(pod, nodeInfoCopy.Node())
if !status.IsSuccess() {
removePod(p)
victims = append(victims, p)
}
return fits
return status.IsSuccess()
}
// For now, assume all potential victims to be non-violating.
@ -280,42 +288,39 @@ func (f *FakeExtender) selectVictimsOnNodeByExtender(pod *v1.Pod, node *v1.Node)
// runPredicate run predicates of extender one by one for given pod and node.
// Returns: fits or not.
func (f *FakeExtender) runPredicate(pod *v1.Pod, node *v1.Node) (bool, error) {
fits := true
var err error
func (f *FakeExtender) runPredicate(pod *v1.Pod, node *v1.Node) *framework.Status {
for _, predicate := range f.Predicates {
fits, err = predicate(pod, node)
if err != nil {
return false, err
}
if !fits {
break
status := predicate(pod, node)
if !status.IsSuccess() {
return status
}
}
return fits, nil
return framework.NewStatus(framework.Success)
}
// Filter implements the extender Filter function.
func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, extenderv1.FailedNodesMap, error) {
func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, extenderv1.FailedNodesMap, extenderv1.FailedNodesMap, error) {
var filtered []*v1.Node
failedNodesMap := extenderv1.FailedNodesMap{}
failedAndUnresolvableMap := extenderv1.FailedNodesMap{}
for _, node := range nodes {
fits, err := f.runPredicate(pod, node)
if err != nil {
return []*v1.Node{}, extenderv1.FailedNodesMap{}, err
}
if fits {
status := f.runPredicate(pod, node)
if status.IsSuccess() {
filtered = append(filtered, node)
} else if status.Code() == framework.Unschedulable {
failedNodesMap[node.Name] = fmt.Sprintf("FakeExtender: node %q failed", node.Name)
} else if status.Code() == framework.UnschedulableAndUnresolvable {
failedAndUnresolvableMap[node.Name] = fmt.Sprintf("FakeExtender: node %q failed and unresolvable", node.Name)
} else {
failedNodesMap[node.Name] = "FakeExtender failed"
return nil, nil, nil, status.AsError()
}
}
f.FilteredNodes = filtered
if f.NodeCacheCapable {
return filtered, failedNodesMap, nil
return filtered, failedNodesMap, failedAndUnresolvableMap, nil
}
return filtered, failedNodesMap, nil
return filtered, failedNodesMap, failedAndUnresolvableMap, nil
}
// Prioritize implements the extender Prioritize function.

View File

@ -92,6 +92,10 @@ type ExtenderFilterResult struct {
NodeNames *[]string
// Filtered out nodes where the pod can't be scheduled and the failure messages
FailedNodes FailedNodesMap
// Filtered out nodes where the pod can't be scheduled and preemption would
// not change anything. The value is the failure message same as FailedNodes.
// Nodes specified here takes precedence over FailedNodes.
FailedAndUnresolvableNodes FailedNodesMap
// Error message indicating failure
Error string
}

View File

@ -67,12 +67,13 @@ func TestCompatibility(t *testing.T) {
{
emptyObj: &ExtenderFilterResult{},
obj: &ExtenderFilterResult{
Nodes: &corev1.NodeList{Items: []corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "nodename"}}}},
NodeNames: &[]string{"node1"},
FailedNodes: FailedNodesMap{"foo": "bar"},
Error: "myerror",
Nodes: &corev1.NodeList{Items: []corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "nodename"}}}},
NodeNames: &[]string{"node1"},
FailedNodes: FailedNodesMap{"foo": "bar"},
FailedAndUnresolvableNodes: FailedNodesMap{"baz": "qux"},
Error: "myerror",
},
expectJSON: `{"Nodes":{"metadata":{},"items":[{"metadata":{"name":"nodename","creationTimestamp":null},"spec":{},"status":{"daemonEndpoints":{"kubeletEndpoint":{"Port":0}},"nodeInfo":{"machineID":"","systemUUID":"","bootID":"","kernelVersion":"","osImage":"","containerRuntimeVersion":"","kubeletVersion":"","kubeProxyVersion":"","operatingSystem":"","architecture":""}}}]},"NodeNames":["node1"],"FailedNodes":{"foo":"bar"},"Error":"myerror"}`,
expectJSON: `{"Nodes":{"metadata":{},"items":[{"metadata":{"name":"nodename","creationTimestamp":null},"spec":{},"status":{"daemonEndpoints":{"kubeletEndpoint":{"Port":0}},"nodeInfo":{"machineID":"","systemUUID":"","bootID":"","kernelVersion":"","osImage":"","containerRuntimeVersion":"","kubeletVersion":"","kubeProxyVersion":"","operatingSystem":"","architecture":""}}}]},"NodeNames":["node1"],"FailedNodes":{"foo":"bar"},"FailedAndUnresolvableNodes":{"baz":"qux"},"Error":"myerror"}`,
},
{
emptyObj: &ExtenderBindingArgs{},

View File

@ -115,6 +115,13 @@ func (in *ExtenderFilterResult) DeepCopyInto(out *ExtenderFilterResult) {
(*out)[key] = val
}
}
if in.FailedAndUnresolvableNodes != nil {
in, out := &in.FailedAndUnresolvableNodes, &out.FailedAndUnresolvableNodes
*out = make(FailedNodesMap, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
return
}