Merge pull request #92797 from ahg-g/ahg-prefilter

Return a FitError when PreFilter fails with unschedulable status
This commit is contained in:
Kubernetes Prow Robot 2020-07-10 15:42:31 -07:00 committed by GitHub
commit fbc9cf0894
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 118 additions and 35 deletions

View File

@ -154,13 +154,13 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile,
}
startPredicateEvalTime := time.Now()
filteredNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
if err != nil {
return result, err
}
trace.Step("Computing predicates done")
if len(filteredNodes) == 0 {
if len(feasibleNodes) == 0 {
return result, &FitError{
Pod: pod,
NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
@ -173,16 +173,16 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile,
startPriorityEvalTime := time.Now()
// When only one node after predicate, just use it.
if len(filteredNodes) == 1 {
if len(feasibleNodes) == 1 {
metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))
return ScheduleResult{
SuggestedHost: filteredNodes[0].Name,
SuggestedHost: feasibleNodes[0].Name,
EvaluatedNodes: 1 + len(filteredNodesStatuses),
FeasibleNodes: 1,
}, nil
}
priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, filteredNodes)
priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes)
if err != nil {
return result, err
}
@ -195,8 +195,8 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile,
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(filteredNodes) + len(filteredNodesStatuses),
FeasibleNodes: len(filteredNodes),
EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses),
FeasibleNodes: len(feasibleNodes),
}, err
}
@ -256,23 +256,37 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i
// Filters the nodes to find the ones that fit the pod based on the framework
// filter plugins and filter extenders.
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
filteredNodesStatuses := make(framework.NodeToStatusMap)
// Run "prefilter" plugins.
s := prof.RunPreFilterPlugins(ctx, state, pod)
if !s.IsSuccess() {
return nil, nil, s.AsError()
if !s.IsUnschedulable() {
return nil, nil, s.AsError()
}
// All nodes will have the same status. Some non trivial refactoring is
// needed to avoid this copy.
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
if err != nil {
return nil, nil, err
}
for _, n := range allNodes {
filteredNodesStatuses[n.Node().Name] = s
}
return nil, filteredNodesStatuses, nil
}
filteredNodesStatuses := make(framework.NodeToStatusMap)
filtered, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses)
feasibleNodes, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses)
if err != nil {
return nil, nil, err
}
filtered, err = g.findNodesThatPassExtenders(pod, filtered, filteredNodesStatuses)
feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, filteredNodesStatuses)
if err != nil {
return nil, nil, err
}
return filtered, filteredNodesStatuses, nil
return feasibleNodes, filteredNodesStatuses, nil
}
// findNodesThatPassFilters finds the nodes that fit the filter plugins.
@ -284,22 +298,22 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes)))
// Create filtered list with enough space to avoid growing it
// Create feasible list with enough space to avoid growing it
// and allow assigning.
filtered := make([]*v1.Node, numNodesToFind)
feasibleNodes := make([]*v1.Node, numNodesToFind)
if !prof.HasFilterPlugins() {
length := len(allNodes)
for i := range filtered {
filtered[i] = allNodes[(g.nextStartNodeIndex+i)%length].Node()
for i := range feasibleNodes {
feasibleNodes[i] = allNodes[(g.nextStartNodeIndex+i)%length].Node()
}
g.nextStartNodeIndex = (g.nextStartNodeIndex + len(filtered)) % length
return filtered, nil
g.nextStartNodeIndex = (g.nextStartNodeIndex + len(feasibleNodes)) % length
return feasibleNodes, nil
}
errCh := parallelize.NewErrorChannel()
var statusesLock sync.Mutex
var filteredLen int32
var feasibleNodesLen int32
ctx, cancel := context.WithCancel(ctx)
checkNode := func(i int) {
// We check the nodes starting from where we left off in the previous scheduling cycle,
@ -311,12 +325,12 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
return
}
if fits {
length := atomic.AddInt32(&filteredLen, 1)
length := atomic.AddInt32(&feasibleNodesLen, 1)
if length > numNodesToFind {
cancel()
atomic.AddInt32(&filteredLen, -1)
atomic.AddInt32(&feasibleNodesLen, -1)
} else {
filtered[length-1] = nodeInfo.Node()
feasibleNodes[length-1] = nodeInfo.Node()
}
} else {
statusesLock.Lock()
@ -339,26 +353,26 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
// Stops searching for more nodes once the configured number of feasible nodes
// are found.
parallelize.Until(ctx, len(allNodes), checkNode)
processedNodes := int(filteredLen) + len(statuses)
processedNodes := int(feasibleNodesLen) + len(statuses)
g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes)
filtered = filtered[:filteredLen]
feasibleNodes = feasibleNodes[:feasibleNodesLen]
if err := errCh.ReceiveError(); err != nil {
statusCode = framework.Error
return nil, err
}
return filtered, nil
return feasibleNodes, nil
}
func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, feasibleNodes []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
for _, extender := range g.extenders {
if len(filtered) == 0 {
if len(feasibleNodes) == 0 {
break
}
if !extender.IsInterested(pod) {
continue
}
filteredList, failedMap, err := extender.Filter(pod, filtered)
feasibleList, failedMap, err := extender.Filter(pod, feasibleNodes)
if err != nil {
if extender.IsIgnorable() {
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
@ -375,9 +389,9 @@ func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v
statuses[failedNodeName].AppendReason(failedMsg)
}
}
filtered = filteredList
feasibleNodes = feasibleList
}
return filtered, nil
return feasibleNodes, nil
}
// addNominatedPods adds pods with equal or greater priority which are nominated

View File

@ -677,6 +677,43 @@ func TestGenericScheduler(t *testing.T) {
expectedHosts: nil,
wErr: nil,
},
{
name: "test prefilter plugin returning Unschedulable status",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPreFilterPlugin(
"FakePreFilter",
st.NewFakePreFilterPlugin(framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status")),
),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"1", "2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
expectedHosts: nil,
wErr: &FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
NumAllNodes: 2,
FilteredNodesStatuses: framework.NodeToStatusMap{
"1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status"),
"2": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status"),
},
},
},
{
name: "test prefilter plugin returning error status",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPreFilterPlugin(
"FakePreFilter",
st.NewFakePreFilterPlugin(framework.NewStatus(framework.Error, "injected error status")),
),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"1", "2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
expectedHosts: nil,
wErr: fmt.Errorf(`prefilter plugin "FakePreFilter" failed for pod "test-prefilter": injected error status`),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
@ -717,7 +754,7 @@ func TestGenericScheduler(t *testing.T) {
schedulerapi.DefaultPercentageOfNodesToScore)
result, err := scheduler.Schedule(context.Background(), prof, framework.NewCycleState(), test.pod)
if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr)
t.Errorf("want: %v, got: %v", test.wErr, err)
}
if test.expectedHosts != nil && !test.expectedHosts.Has(result.SuggestedHost) {
t.Errorf("Expected: %s, got: %s", test.expectedHosts, result.SuggestedHost)

View File

@ -395,11 +395,9 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
status = f.runPreFilterPlugin(ctx, pl, state, pod)
if !status.IsSuccess() {
if status.IsUnschedulable() {
msg := fmt.Sprintf("rejected by %q at prefilter: %v", pl.Name(), status.Message())
klog.V(4).Infof(msg)
return framework.NewStatus(status.Code(), msg)
return status
}
msg := fmt.Sprintf("error while running %q prefilter plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
msg := fmt.Sprintf("prefilter plugin %q failed for pod %q: %v", pl.Name(), pod.Name, status.Message())
klog.Error(msg)
return framework.NewStatus(framework.Error, msg)
}

View File

@ -123,3 +123,32 @@ func (pl *MatchFilterPlugin) Filter(_ context.Context, _ *framework.CycleState,
func NewMatchFilterPlugin(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) {
return &MatchFilterPlugin{}, nil
}
// FakePreFilterPlugin is a test filter plugin.
type FakePreFilterPlugin struct {
Status *framework.Status
}
// Name returns name of the plugin.
func (pl *FakePreFilterPlugin) Name() string {
return "FakePreFilter"
}
// PreFilter invoked at the PreFilter extension point.
func (pl *FakePreFilterPlugin) PreFilter(_ context.Context, _ *framework.CycleState, pod *v1.Pod) *framework.Status {
return pl.Status
}
// PreFilterExtensions no extensions implemented by this plugin.
func (pl *FakePreFilterPlugin) PreFilterExtensions() framework.PreFilterExtensions {
return nil
}
// NewFakePreFilterPlugin initializes a fakePreFilterPlugin and returns it.
func NewFakePreFilterPlugin(status *framework.Status) frameworkruntime.PluginFactory {
return func(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) {
return &FakePreFilterPlugin{
Status: status,
}, nil
}
}

View File

@ -42,6 +42,11 @@ func RegisterQueueSortPlugin(pluginName string, pluginNewFunc runtime.PluginFact
return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "QueueSort")
}
// RegisterPreFilterPlugin returns a function to register a PreFilter Plugin to a given registry.
func RegisterPreFilterPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc {
return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "PreFilter")
}
// RegisterFilterPlugin returns a function to register a Filter Plugin to a given registry.
func RegisterFilterPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc {
return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "Filter")