Return a FitError when PreFilter fails with unschedulable status

This commit is contained in:
Abdullah Gharaibeh
2020-07-04 11:43:12 -04:00
parent a472138c1f
commit c98dee4945
5 changed files with 118 additions and 35 deletions

View File

@@ -151,13 +151,13 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile,
}
startPredicateEvalTime := time.Now()
filteredNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
if err != nil {
return result, err
}
trace.Step("Computing predicates done")
if len(filteredNodes) == 0 {
if len(feasibleNodes) == 0 {
return result, &FitError{
Pod: pod,
NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
@@ -170,16 +170,16 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile,
startPriorityEvalTime := time.Now()
// When only one node after predicate, just use it.
if len(filteredNodes) == 1 {
if len(feasibleNodes) == 1 {
metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))
return ScheduleResult{
SuggestedHost: filteredNodes[0].Name,
SuggestedHost: feasibleNodes[0].Name,
EvaluatedNodes: 1 + len(filteredNodesStatuses),
FeasibleNodes: 1,
}, nil
}
priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, filteredNodes)
priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes)
if err != nil {
return result, err
}
@@ -192,8 +192,8 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile,
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(filteredNodes) + len(filteredNodesStatuses),
FeasibleNodes: len(filteredNodes),
EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses),
FeasibleNodes: len(feasibleNodes),
}, err
}
@@ -253,23 +253,37 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i
// Filters the nodes to find the ones that fit the pod based on the framework
// filter plugins and filter extenders.
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
filteredNodesStatuses := make(framework.NodeToStatusMap)
// Run "prefilter" plugins.
s := prof.RunPreFilterPlugins(ctx, state, pod)
if !s.IsSuccess() {
return nil, nil, s.AsError()
if !s.IsUnschedulable() {
return nil, nil, s.AsError()
}
// All nodes will have the same status. Some non trivial refactoring is
// needed to avoid this copy.
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
if err != nil {
return nil, nil, err
}
for _, n := range allNodes {
filteredNodesStatuses[n.Node().Name] = s
}
return nil, filteredNodesStatuses, nil
}
filteredNodesStatuses := make(framework.NodeToStatusMap)
filtered, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses)
feasibleNodes, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses)
if err != nil {
return nil, nil, err
}
filtered, err = g.findNodesThatPassExtenders(pod, filtered, filteredNodesStatuses)
feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, filteredNodesStatuses)
if err != nil {
return nil, nil, err
}
return filtered, filteredNodesStatuses, nil
return feasibleNodes, filteredNodesStatuses, nil
}
// findNodesThatPassFilters finds the nodes that fit the filter plugins.
@@ -281,22 +295,22 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes)))
// Create filtered list with enough space to avoid growing it
// Create feasible list with enough space to avoid growing it
// and allow assigning.
filtered := make([]*v1.Node, numNodesToFind)
feasibleNodes := make([]*v1.Node, numNodesToFind)
if !prof.HasFilterPlugins() {
length := len(allNodes)
for i := range filtered {
filtered[i] = allNodes[(g.nextStartNodeIndex+i)%length].Node()
for i := range feasibleNodes {
feasibleNodes[i] = allNodes[(g.nextStartNodeIndex+i)%length].Node()
}
g.nextStartNodeIndex = (g.nextStartNodeIndex + len(filtered)) % length
return filtered, nil
g.nextStartNodeIndex = (g.nextStartNodeIndex + len(feasibleNodes)) % length
return feasibleNodes, nil
}
errCh := parallelize.NewErrorChannel()
var statusesLock sync.Mutex
var filteredLen int32
var feasibleNodesLen int32
ctx, cancel := context.WithCancel(ctx)
checkNode := func(i int) {
// We check the nodes starting from where we left off in the previous scheduling cycle,
@@ -308,12 +322,12 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
return
}
if fits {
length := atomic.AddInt32(&filteredLen, 1)
length := atomic.AddInt32(&feasibleNodesLen, 1)
if length > numNodesToFind {
cancel()
atomic.AddInt32(&filteredLen, -1)
atomic.AddInt32(&feasibleNodesLen, -1)
} else {
filtered[length-1] = nodeInfo.Node()
feasibleNodes[length-1] = nodeInfo.Node()
}
} else {
statusesLock.Lock()
@@ -336,26 +350,26 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
// Stops searching for more nodes once the configured number of feasible nodes
// are found.
parallelize.Until(ctx, len(allNodes), checkNode)
processedNodes := int(filteredLen) + len(statuses)
processedNodes := int(feasibleNodesLen) + len(statuses)
g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes)
filtered = filtered[:filteredLen]
feasibleNodes = feasibleNodes[:feasibleNodesLen]
if err := errCh.ReceiveError(); err != nil {
statusCode = framework.Error
return nil, err
}
return filtered, nil
return feasibleNodes, nil
}
func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, feasibleNodes []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
for _, extender := range g.extenders {
if len(filtered) == 0 {
if len(feasibleNodes) == 0 {
break
}
if !extender.IsInterested(pod) {
continue
}
filteredList, failedMap, err := extender.Filter(pod, filtered)
feasibleList, failedMap, err := extender.Filter(pod, feasibleNodes)
if err != nil {
if extender.IsIgnorable() {
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
@@ -372,9 +386,9 @@ func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v
statuses[failedNodeName].AppendReason(failedMsg)
}
}
filtered = filteredList
feasibleNodes = feasibleList
}
return filtered, nil
return feasibleNodes, nil
}
// addNominatedPods adds pods with equal or greater priority which are nominated