Merge pull request #114125 from sanposhiho/skip-reimplementation

feature(scheduler): won't run Filter if PreFilter returned a Skip status
This commit is contained in:
Kubernetes Prow Robot 2023-01-06 02:25:59 -08:00 committed by GitHub
commit c549b59983
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 406 additions and 47 deletions

View File

@ -19,6 +19,8 @@ package framework
import (
"errors"
"sync"
"k8s.io/apimachinery/pkg/util/sets"
)
var (
@ -48,6 +50,8 @@ type CycleState struct {
storage sync.Map
// if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle.
recordPluginMetrics bool
// SkipFilterPlugins are plugins that will be skipped in the Filter extension point.
SkipFilterPlugins sets.Set[string]
}
// NewCycleState initializes a new CycleState and returns its pointer.
@ -83,6 +87,7 @@ func (c *CycleState) Clone() *CycleState {
return true
})
copy.recordPluginMetrics = c.recordPluginMetrics
copy.SkipFilterPlugins = c.SkipFilterPlugins
return copy
}

View File

@ -91,7 +91,9 @@ const (
UnschedulableAndUnresolvable
// Wait is used when a Permit plugin finds a pod scheduling should wait.
Wait
// Skip is used when a Bind plugin chooses to skip binding.
// Skip is used in the following scenarios:
// - when a Bind plugin chooses to skip binding.
// - when a PreFilter plugin returns Skip so that coupled Filter plugin/PreFilterExtensions() will be skipped.
Skip
)
@ -348,6 +350,8 @@ type PreFilterPlugin interface {
// plugins must return success or the pod will be rejected. PreFilter could optionally
// return a PreFilterResult to influence which nodes to evaluate downstream. This is useful
// for cases where it is possible to determine the subset of nodes to process in O(1) time.
// When it returns Skip status, returned PreFilterResult and other fields in status are just ignored,
// and coupled Filter plugin/PreFilterExtensions() will be skipped in this scheduling cycle.
PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) (*PreFilterResult, *Status)
// PreFilterExtensions returns a PreFilterExtensions interface if the plugin implements one,
// or nil if it does not. A Pre-filter plugin can provide extensions to incrementally

View File

@ -20,7 +20,7 @@ import (
"context"
"fmt"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
@ -89,13 +89,19 @@ func (pl *NodeAffinity) EventsToRegister() []framework.ClusterEvent {
// PreFilter builds and writes cycle state used by Filter.
func (pl *NodeAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
affinity := pod.Spec.Affinity
noNodeAffinity := (affinity == nil ||
affinity.NodeAffinity == nil ||
affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil)
if noNodeAffinity && pl.addedNodeSelector == nil && pod.Spec.NodeSelector == nil {
// NodeAffinity Filter has nothing to do with the Pod.
return nil, framework.NewStatus(framework.Skip)
}
state := &preFilterState{requiredNodeSelectorAndAffinity: nodeaffinity.GetRequiredNodeAffinity(pod)}
cycleState.Write(preFilterStateKey, state)
affinity := pod.Spec.Affinity
if affinity == nil ||
affinity.NodeAffinity == nil ||
affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil ||
len(affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) == 0 {
if noNodeAffinity || len(affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) == 0 {
return nil, nil
}

View File

@ -44,10 +44,6 @@ func TestNodeAffinity(t *testing.T) {
args config.NodeAffinityArgs
disablePreFilter bool
}{
{
name: "no selector",
pod: &v1.Pod{},
},
{
name: "missing labels",
pod: st.MakePod().NodeSelector(map[string]string{
@ -285,6 +281,7 @@ func TestNodeAffinity(t *testing.T) {
labels: map[string]string{
"foo": "bar",
},
wantPreFilterStatus: framework.NewStatus(framework.Skip),
},
{
name: "Pod with Affinity but nil NodeSelector will schedule onto a node",
@ -300,6 +297,7 @@ func TestNodeAffinity(t *testing.T) {
labels: map[string]string{
"foo": "bar",
},
wantPreFilterStatus: framework.NewStatus(framework.Skip),
},
{
name: "Pod with multiple matchExpressions ANDed that matches the existing node",

View File

@ -599,8 +599,10 @@ func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {
// RunPreFilterPlugins runs the set of configured PreFilter plugins. It returns
// *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If a non-success status is returned, then the scheduling
// cycle is aborted.
// anything but Success/Skip.
// When it returns Skip status, returned PreFilterResult and other fields in status are just ignored,
// and coupled Filter plugin/PreFilterExtensions() will be skipped in this scheduling cycle.
// If a non-success status is returned, then the scheduling cycle is aborted.
func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status) {
startTime := time.Now()
defer func() {
@ -608,8 +610,13 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
}()
var result *framework.PreFilterResult
var pluginsWithNodes []string
skipPlugins := sets.New[string]()
for _, pl := range f.preFilterPlugins {
r, s := f.runPreFilterPlugin(ctx, pl, state, pod)
if s.IsSkip() {
skipPlugins.Insert(pl.Name())
continue
}
if !s.IsSuccess() {
s.SetFailedPlugin(pl.Name())
if s.IsUnschedulable() {
@ -628,8 +635,8 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
}
return nil, framework.NewStatus(framework.Unschedulable, msg)
}
}
state.SkipFilterPlugins = skipPlugins
return result, nil
}
@ -654,7 +661,7 @@ func (f *frameworkImpl) RunPreFilterExtensionAddPod(
nodeInfo *framework.NodeInfo,
) (status *framework.Status) {
for _, pl := range f.preFilterPlugins {
if pl.PreFilterExtensions() == nil {
if pl.PreFilterExtensions() == nil || state.SkipFilterPlugins.Has(pl.Name()) {
continue
}
status = f.runPreFilterExtensionAddPod(ctx, pl, state, podToSchedule, podInfoToAdd, nodeInfo)
@ -689,7 +696,7 @@ func (f *frameworkImpl) RunPreFilterExtensionRemovePod(
nodeInfo *framework.NodeInfo,
) (status *framework.Status) {
for _, pl := range f.preFilterPlugins {
if pl.PreFilterExtensions() == nil {
if pl.PreFilterExtensions() == nil || state.SkipFilterPlugins.Has(pl.Name()) {
continue
}
status = f.runPreFilterExtensionRemovePod(ctx, pl, state, podToSchedule, podInfoToRemove, nodeInfo)
@ -726,6 +733,9 @@ func (f *frameworkImpl) RunFilterPlugins(
var status *framework.Status
for _, pl := range f.filterPlugins {
if state.SkipFilterPlugins.Has(pl.Name()) {
continue
}
status = f.runFilterPlugin(ctx, pl, state, pod, nodeInfo)
if !status.IsSuccess() {
if !status.IsUnschedulable() {

View File

@ -1402,9 +1402,11 @@ func TestPreFilterPlugins(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
f.RunPreFilterPlugins(ctx, nil, nil)
f.RunPreFilterExtensionAddPod(ctx, nil, nil, nil, nil)
f.RunPreFilterExtensionRemovePod(ctx, nil, nil, nil, nil)
state := framework.NewCycleState()
f.RunPreFilterPlugins(ctx, state, nil)
f.RunPreFilterExtensionAddPod(ctx, state, nil, nil, nil)
f.RunPreFilterExtensionRemovePod(ctx, state, nil, nil, nil)
if preFilter1.PreFilterCalled != 1 {
t.Errorf("preFilter1 called %v, expected: 1", preFilter1.PreFilterCalled)
@ -1421,39 +1423,309 @@ func TestPreFilterPlugins(t *testing.T) {
})
}
func TestRunPreFilterPluginsStatus(t *testing.T) {
preFilter := &TestPlugin{
name: preFilterPluginName,
inj: injectedResult{PreFilterStatus: int(framework.Error)},
func TestRunPreFilterPlugins(t *testing.T) {
tests := []struct {
name string
plugins []*TestPlugin
wantPreFilterResult *framework.PreFilterResult
wantSkippedPlugins sets.Set[string]
wantStatusCode framework.Code
}{
{
name: "all PreFilter returned success",
plugins: []*TestPlugin{
{
name: "success1",
},
{
name: "success2",
},
},
wantPreFilterResult: nil,
wantStatusCode: framework.Success,
},
{
name: "one PreFilter plugin returned success, but another PreFilter plugin returned non-success",
plugins: []*TestPlugin{
{
name: "success",
},
{
name: "error",
inj: injectedResult{PreFilterStatus: int(framework.Error)},
},
},
wantPreFilterResult: nil,
wantStatusCode: framework.Error,
},
{
name: "one PreFilter plugin returned skip, but another PreFilter plugin returned non-success",
plugins: []*TestPlugin{
{
name: "skip",
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
},
{
name: "error",
inj: injectedResult{PreFilterStatus: int(framework.Error)},
},
},
wantPreFilterResult: nil,
wantStatusCode: framework.Error,
},
{
name: "all PreFilter plugins returned skip",
plugins: []*TestPlugin{
{
name: "skip1",
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
},
{
name: "skip2",
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
},
{
name: "skip3",
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
},
},
wantPreFilterResult: nil,
wantSkippedPlugins: sets.New("skip1", "skip2", "skip3"),
wantStatusCode: framework.Success,
},
{
name: "some PreFilter plugins returned skip",
plugins: []*TestPlugin{
{
name: "skip1",
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
},
{
name: "success1",
},
{
name: "skip2",
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
},
{
name: "success2",
},
},
wantPreFilterResult: nil,
wantSkippedPlugins: sets.New("skip1", "skip2"),
wantStatusCode: framework.Success,
},
}
r := make(Registry)
r.Register(preFilterPluginName,
func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return preFilter, nil
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := make(Registry)
enabled := make([]config.Plugin, len(tt.plugins))
for i, p := range tt.plugins {
p := p
enabled[i].Name = p.name
r.Register(p.name, func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return p, nil
})
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(
r,
config.KubeSchedulerProfile{Plugins: &config.Plugins{PreFilter: config.PluginSet{Enabled: enabled}}},
ctx.Done(),
)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
state := framework.NewCycleState()
result, status := f.RunPreFilterPlugins(ctx, state, nil)
if d := cmp.Diff(result, tt.wantPreFilterResult); d != "" {
t.Errorf("wrong status. got: %v, want: %v, diff: %s", result, tt.wantPreFilterResult, d)
}
if status.Code() != tt.wantStatusCode {
t.Errorf("wrong status code. got: %v, want: %v", status, tt.wantStatusCode)
}
skipped := state.SkipFilterPlugins
if d := cmp.Diff(skipped, tt.wantSkippedPlugins); d != "" {
t.Errorf("wrong skip filter plugins. got: %v, want: %v, diff: %s", skipped, tt.wantSkippedPlugins, d)
}
})
plugins := &config.Plugins{PreFilter: config.PluginSet{Enabled: []config.Plugin{{Name: preFilterPluginName}}}}
profile := config.KubeSchedulerProfile{Plugins: plugins}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(r, profile, ctx.Done())
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
_, status := f.RunPreFilterPlugins(ctx, nil, nil)
wantStatus := framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", preFilter.Name(), errInjectedStatus)).WithFailedPlugin(preFilter.Name())
if !reflect.DeepEqual(status, wantStatus) {
t.Errorf("wrong status. got: %v, want:%v", status, wantStatus)
}
func TestRunPreFilterExtensionRemovePod(t *testing.T) {
tests := []struct {
name string
plugins []*TestPlugin
skippedPluginNames sets.Set[string]
wantStatusCode framework.Code
}{
{
name: "no plugins are skipped and all RemovePod() returned success",
plugins: []*TestPlugin{
{
name: "success1",
},
{
name: "success2",
},
},
wantStatusCode: framework.Success,
},
{
name: "one RemovePod() returned error",
plugins: []*TestPlugin{
{
name: "success1",
},
{
name: "error1",
inj: injectedResult{PreFilterRemovePodStatus: int(framework.Error)},
},
},
wantStatusCode: framework.Error,
},
{
name: "one RemovePod() is skipped",
plugins: []*TestPlugin{
{
name: "success1",
},
{
name: "skipped",
// To confirm it's skipped, return error so that this test case will fail when it isn't skipped.
inj: injectedResult{PreFilterRemovePodStatus: int(framework.Error)},
},
},
skippedPluginNames: sets.New("skipped"),
wantStatusCode: framework.Success,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := make(Registry)
enabled := make([]config.Plugin, len(tt.plugins))
for i, p := range tt.plugins {
p := p
enabled[i].Name = p.name
r.Register(p.name, func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return p, nil
})
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(
r,
config.KubeSchedulerProfile{Plugins: &config.Plugins{PreFilter: config.PluginSet{Enabled: enabled}}},
ctx.Done(),
)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
state := framework.NewCycleState()
state.SkipFilterPlugins = tt.skippedPluginNames
status := f.RunPreFilterExtensionRemovePod(ctx, state, nil, nil, nil)
if status.Code() != tt.wantStatusCode {
t.Errorf("wrong status code. got: %v, want: %v", status, tt.wantStatusCode)
}
})
}
}
func TestRunPreFilterExtensionAddPod(t *testing.T) {
tests := []struct {
name string
plugins []*TestPlugin
skippedPluginNames sets.Set[string]
wantStatusCode framework.Code
}{
{
name: "no plugins are skipped and all AddPod() returned success",
plugins: []*TestPlugin{
{
name: "success1",
},
{
name: "success2",
},
},
wantStatusCode: framework.Success,
},
{
name: "one AddPod() returned error",
plugins: []*TestPlugin{
{
name: "success1",
},
{
name: "error1",
inj: injectedResult{PreFilterAddPodStatus: int(framework.Error)},
},
},
wantStatusCode: framework.Error,
},
{
name: "one AddPod() is skipped",
plugins: []*TestPlugin{
{
name: "success1",
},
{
name: "skipped",
// To confirm it's skipped, return error so that this test case will fail when it isn't skipped.
inj: injectedResult{PreFilterAddPodStatus: int(framework.Error)},
},
},
skippedPluginNames: sets.New("skipped"),
wantStatusCode: framework.Success,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := make(Registry)
enabled := make([]config.Plugin, len(tt.plugins))
for i, p := range tt.plugins {
p := p
enabled[i].Name = p.name
r.Register(p.name, func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return p, nil
})
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(
r,
config.KubeSchedulerProfile{Plugins: &config.Plugins{PreFilter: config.PluginSet{Enabled: enabled}}},
ctx.Done(),
)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
state := framework.NewCycleState()
state.SkipFilterPlugins = tt.skippedPluginNames
status := f.RunPreFilterExtensionAddPod(ctx, state, nil, nil, nil)
if status.Code() != tt.wantStatusCode {
t.Errorf("wrong status code. got: %v, want: %v", status, tt.wantStatusCode)
}
})
}
}
func TestFilterPlugins(t *testing.T) {
tests := []struct {
name string
plugins []*TestPlugin
wantStatus *framework.Status
name string
plugins []*TestPlugin
skippedPlugins sets.Set[string]
wantStatus *framework.Status
}{
{
name: "SuccessFilter",
@ -1553,6 +1825,22 @@ func TestFilterPlugins(t *testing.T) {
},
wantStatus: nil,
},
{
name: "SuccessAndSkipFilters",
plugins: []*TestPlugin{
{
name: "TestPlugin1",
inj: injectedResult{FilterStatus: int(framework.Success)},
},
{
name: "TestPlugin2",
inj: injectedResult{FilterStatus: int(framework.Error)}, // To make sure this plugins isn't called, set error as an injected result.
},
},
wantStatus: nil,
skippedPlugins: sets.New("TestPlugin2"),
},
{
name: "ErrorAndSuccessFilters",
plugins: []*TestPlugin{
@ -1624,7 +1912,9 @@ func TestFilterPlugins(t *testing.T) {
if err != nil {
t.Fatalf("fail to create framework: %s", err)
}
gotStatus := f.RunFilterPlugins(ctx, nil, pod, nil)
state := framework.NewCycleState()
state.SkipFilterPlugins = tt.skippedPlugins
gotStatus := f.RunFilterPlugins(ctx, state, pod, nil)
if diff := cmp.Diff(gotStatus, tt.wantStatus, cmpOpts...); diff != "" {
t.Errorf("Unexpected status: (-got, +want):\n%s", diff)
}
@ -1896,7 +2186,7 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) {
t.Fatalf("fail to create framework: %s", err)
}
tt.nodeInfo.SetNode(tt.node)
gotStatus := f.RunFilterPluginsWithNominatedPods(ctx, nil, tt.pod, tt.nodeInfo)
gotStatus := f.RunFilterPluginsWithNominatedPods(ctx, framework.NewCycleState(), tt.pod, tt.nodeInfo)
if diff := cmp.Diff(gotStatus, tt.wantStatus, cmpOpts...); diff != "" {
t.Errorf("Unexpected status: (-got, +want):\n%s", diff)
}

View File

@ -1980,6 +1980,42 @@ func TestSchedulerSchedulePod(t *testing.T) {
},
},
},
{
name: "test prefilter plugin returning skip",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPreFilterPlugin(
"FakePreFilter1",
st.NewFakePreFilterPlugin("FakeFilter1", nil, nil),
),
st.RegisterFilterPlugin(
"FakeFilter1",
st.NewFakeFilterPlugin(map[string]framework.Code{
"node1": framework.Unschedulable,
}),
),
st.RegisterPluginAsExtensions("FakeFilter2", func(configuration runtime.Object, f framework.Handle) (framework.Plugin, error) {
return st.FakePreFilterAndFilterPlugin{
FakePreFilterPlugin: &st.FakePreFilterPlugin{
Result: nil,
Status: framework.NewStatus(framework.Skip),
},
FakeFilterPlugin: &st.FakeFilterPlugin{
// This Filter plugin shouldn't be executed in the Filter extension point due to skip.
// To confirm that, return the status code Error to all Nodes.
FailedNodeReturnCodeMap: map[string]framework.Code{
"node1": framework.Error, "node2": framework.Error, "node3": framework.Error,
},
},
}, nil
}, "PreFilter", "Filter"),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"node1", "node2", "node3"},
pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
wantNodes: sets.NewString("node2", "node3"),
wantEvaluatedNodes: pointer.Int32(3),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {

View File

@ -22,7 +22,7 @@ import (
"sync/atomic"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/framework"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
@ -67,6 +67,16 @@ func NewTrueFilterPlugin(_ runtime.Object, _ framework.Handle) (framework.Plugin
return &TrueFilterPlugin{}, nil
}
type FakePreFilterAndFilterPlugin struct {
*FakePreFilterPlugin
*FakeFilterPlugin
}
// Name returns name of the plugin.
func (pl FakePreFilterAndFilterPlugin) Name() string {
return "FakePreFilterAndFilterPlugin"
}
// FakeFilterPlugin is a test filter plugin to record how many times its Filter() function have
// been called, and it returns different 'Code' depending on its internal 'failedNodeReturnCodeMap'.
type FakeFilterPlugin struct {