feature(scheduler): won't run Score if PreScore returned a Skip status (#115652)

* allow preScore to return skip status to skip running the corresponding score extension

* add test case for all skipped

* add test case for select host

* update plugin status

* skip score when all plugins are skipped

* update
This commit is contained in:
kidddddddddddddddddddddd 2023-02-14 06:53:29 +08:00 committed by GitHub
parent 436ca94642
commit f5a69ffda9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 355 additions and 29 deletions

View File

@ -52,6 +52,8 @@ type CycleState struct {
recordPluginMetrics bool
// SkipFilterPlugins are plugins that will be skipped in the Filter extension point.
SkipFilterPlugins sets.Set[string]
// SkipScorePlugins are plugins that will be skipped in the Score extension point.
SkipScorePlugins sets.Set[string]
}
// NewCycleState initializes a new CycleState and returns its pointer.
@ -88,6 +90,7 @@ func (c *CycleState) Clone() *CycleState {
})
copy.recordPluginMetrics = c.recordPluginMetrics
copy.SkipFilterPlugins = c.SkipFilterPlugins
copy.SkipScorePlugins = c.SkipScorePlugins
return copy
}

View File

@ -94,6 +94,7 @@ const (
// Skip is used in the following scenarios:
// - when a Bind plugin chooses to skip binding.
// - when a PreFilter plugin returns Skip so that coupled Filter plugin/PreFilterExtensions() will be skipped.
// - when a PreScore plugin returns Skip so that coupled Score plugin will be skipped.
Skip
)
@ -411,6 +412,8 @@ type PreScorePlugin interface {
// PreScore is called by the scheduling framework after a list of nodes
// passed the filtering phase. All prescore plugins must return success or
// the pod will be rejected
// When it returns Skip status, other fields in status are just ignored,
// and coupled Score plugin will be skipped in this scheduling cycle.
PreScore(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) *Status
}

View File

@ -887,7 +887,9 @@ func addNominatedPods(ctx context.Context, fh framework.Handle, pod *v1.Pod, sta
}
// RunPreScorePlugins runs the set of configured pre-score plugins. If any
// of these plugins returns any status other than "Success", the given pod is rejected.
// of these plugins returns any status other than Success/Skip, the given pod is rejected.
// When it returns Skip status, other fields in status are just ignored,
// and coupled Score plugin will be skipped in this scheduling cycle.
func (f *frameworkImpl) RunPreScorePlugins(
ctx context.Context,
state *framework.CycleState,
@ -898,13 +900,18 @@ func (f *frameworkImpl) RunPreScorePlugins(
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(preScore, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
skipPlugins := sets.New[string]()
for _, pl := range f.preScorePlugins {
status = f.runPreScorePlugin(ctx, pl, state, pod, nodes)
if status.IsSkip() {
skipPlugins.Insert(pl.Name())
continue
}
if !status.IsSuccess() {
return framework.AsStatus(fmt.Errorf("running PreScore plugin %q: %w", pl.Name(), status.AsError()))
}
}
state.SkipScorePlugins = skipPlugins
return nil
}
@ -928,37 +935,45 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
metrics.FrameworkExtensionPointDuration.WithLabelValues(score, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
allNodePluginScores := make([]framework.NodePluginScores, len(nodes))
pluginToNodeScores := make(map[string]framework.NodeScoreList, len(f.scorePlugins))
numPlugins := len(f.scorePlugins) - state.SkipScorePlugins.Len()
plugins := make([]framework.ScorePlugin, 0, numPlugins)
pluginToNodeScores := make(map[string]framework.NodeScoreList, numPlugins)
for _, pl := range f.scorePlugins {
if state.SkipScorePlugins.Has(pl.Name()) {
continue
}
plugins = append(plugins, pl)
pluginToNodeScores[pl.Name()] = make(framework.NodeScoreList, len(nodes))
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
errCh := parallelize.NewErrorChannel()
// Run Score method for each node in parallel.
f.Parallelizer().Until(ctx, len(nodes), func(index int) {
for _, pl := range f.scorePlugins {
nodeName := nodes[index].Name
s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
errCh.SendErrorWithCancel(err, cancel)
return
}
pluginToNodeScores[pl.Name()][index] = framework.NodeScore{
Name: nodeName,
Score: s,
if len(plugins) > 0 {
// Run Score method for each node in parallel.
f.Parallelizer().Until(ctx, len(nodes), func(index int) {
for _, pl := range plugins {
nodeName := nodes[index].Name
s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
errCh.SendErrorWithCancel(err, cancel)
return
}
pluginToNodeScores[pl.Name()][index] = framework.NodeScore{
Name: nodeName,
Score: s,
}
}
}, score)
if err := errCh.ReceiveError(); err != nil {
return nil, framework.AsStatus(fmt.Errorf("running Score plugins: %w", err))
}
}, score)
if err := errCh.ReceiveError(); err != nil {
return nil, framework.AsStatus(fmt.Errorf("running Score plugins: %w", err))
}
// Run NormalizeScore method for each ScorePlugin in parallel.
f.Parallelizer().Until(ctx, len(f.scorePlugins), func(index int) {
pl := f.scorePlugins[index]
f.Parallelizer().Until(ctx, len(plugins), func(index int) {
pl := plugins[index]
if pl.ScoreExtensions() == nil {
return
}
@ -979,10 +994,10 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
f.Parallelizer().Until(ctx, len(nodes), func(index int) {
nodePluginScores := framework.NodePluginScores{
Name: nodes[index].Name,
Scores: make([]framework.PluginScore, len(f.scorePlugins)),
Scores: make([]framework.PluginScore, len(plugins)),
}
for i, pl := range f.scorePlugins {
for i, pl := range plugins {
weight := f.scorePluginWeight[pl.Name()]
nodeScoreList := pluginToNodeScores[pl.Name()]
score := nodeScoreList[index].Score

View File

@ -1061,13 +1061,139 @@ func TestPreEnqueuePlugins(t *testing.T) {
}
}
func TestRunPreScorePlugins(t *testing.T) {
tests := []struct {
name string
plugins []*TestPlugin
wantSkippedPlugins sets.Set[string]
wantStatusCode framework.Code
}{
{
name: "all PreScorePlugins returned success",
plugins: []*TestPlugin{
{
name: "success1",
},
{
name: "success2",
},
},
wantStatusCode: framework.Success,
},
{
name: "one PreScore plugin returned success, but another PreScore plugin returned non-success",
plugins: []*TestPlugin{
{
name: "success",
},
{
name: "error",
inj: injectedResult{PreScoreStatus: int(framework.Error)},
},
},
wantStatusCode: framework.Error,
},
{
name: "one PreScore plugin returned skip, but another PreScore plugin returned non-success",
plugins: []*TestPlugin{
{
name: "skip",
inj: injectedResult{PreScoreStatus: int(framework.Skip)},
},
{
name: "error",
inj: injectedResult{PreScoreStatus: int(framework.Error)},
},
},
wantStatusCode: framework.Error,
},
{
name: "all PreScore plugins returned skip",
plugins: []*TestPlugin{
{
name: "skip1",
inj: injectedResult{PreScoreStatus: int(framework.Skip)},
},
{
name: "skip2",
inj: injectedResult{PreScoreStatus: int(framework.Skip)},
},
{
name: "skip3",
inj: injectedResult{PreScoreStatus: int(framework.Skip)},
},
},
wantSkippedPlugins: sets.New("skip1", "skip2", "skip3"),
wantStatusCode: framework.Success,
},
{
name: "some PreScore plugins returned skip",
plugins: []*TestPlugin{
{
name: "skip1",
inj: injectedResult{PreScoreStatus: int(framework.Skip)},
},
{
name: "success1",
},
{
name: "skip2",
inj: injectedResult{PreScoreStatus: int(framework.Skip)},
},
{
name: "success2",
},
},
wantSkippedPlugins: sets.New("skip1", "skip2"),
wantStatusCode: framework.Success,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := make(Registry)
enabled := make([]config.Plugin, len(tt.plugins))
for i, p := range tt.plugins {
p := p
enabled[i].Name = p.name
r.Register(p.name, func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return p, nil
})
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(
r,
config.KubeSchedulerProfile{Plugins: &config.Plugins{PreScore: config.PluginSet{Enabled: enabled}}},
ctx.Done(),
)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
state := framework.NewCycleState()
status := f.RunPreScorePlugins(ctx, state, nil, nil)
if status.Code() != tt.wantStatusCode {
t.Errorf("wrong status code. got: %v, want: %v", status, tt.wantStatusCode)
}
skipped := state.SkipScorePlugins
if d := cmp.Diff(skipped, tt.wantSkippedPlugins); d != "" {
t.Errorf("wrong skip score plugins. got: %v, want: %v, diff: %s", skipped, tt.wantSkippedPlugins, d)
}
})
}
}
func TestRunScorePlugins(t *testing.T) {
tests := []struct {
name string
registry Registry
plugins *config.Plugins
pluginConfigs []config.PluginConfig
want []framework.NodePluginScores
name string
registry Registry
plugins *config.Plugins
pluginConfigs []config.PluginConfig
want []framework.NodePluginScores
skippedPlugins sets.Set[string]
// If err is true, we expect RunScorePlugin to fail.
err bool
}{
@ -1345,6 +1471,70 @@ func TestRunScorePlugins(t *testing.T) {
},
},
},
{
name: "one success plugin, one skip plugin",
plugins: buildScoreConfigDefaultWeights(scorePlugin1, scoreWithNormalizePlugin1),
pluginConfigs: []config.PluginConfig{
{
Name: scorePlugin1,
Args: &runtime.Unknown{
Raw: []byte(`{ "scoreRes": 1 }`),
},
},
{
Name: scoreWithNormalizePlugin1,
Args: &runtime.Unknown{
Raw: []byte(`{ "scoreStatus": 1 }`), // To make sure this plugin isn't called, set error as an injected result.
},
},
},
skippedPlugins: sets.New(scoreWithNormalizePlugin1),
want: []framework.NodePluginScores{
{
Name: "node1",
Scores: []framework.PluginScore{
{
Name: scorePlugin1,
Score: 1,
},
},
TotalScore: 1,
},
{
Name: "node2",
Scores: []framework.PluginScore{
{
Name: scorePlugin1,
Score: 1,
},
},
TotalScore: 1,
},
},
},
{
name: "all plugins are skipped in prescore",
plugins: buildScoreConfigDefaultWeights(scorePlugin1),
pluginConfigs: []config.PluginConfig{
{
Name: scorePlugin1,
Args: &runtime.Unknown{
Raw: []byte(`{ "scoreStatus": 1 }`), // To make sure this plugin isn't called, set error as an injected result.
},
},
},
skippedPlugins: sets.New(scorePlugin1),
want: []framework.NodePluginScores{
{
Name: "node1",
Scores: []framework.PluginScore{},
},
{
Name: "node2",
Scores: []framework.PluginScore{},
},
},
},
}
for _, tt := range tests {
@ -1361,6 +1551,8 @@ func TestRunScorePlugins(t *testing.T) {
t.Fatalf("Failed to create framework for testing: %v", err)
}
state := framework.NewCycleState()
state.SkipScorePlugins = tt.skippedPlugins
res, status := f.RunScorePlugins(ctx, state, pod, nodes)
if tt.err {

View File

@ -2016,6 +2016,21 @@ func TestSchedulerSchedulePod(t *testing.T) {
wantNodes: sets.NewString("node2", "node3"),
wantEvaluatedNodes: pointer.Int32(3),
},
{
name: "test all prescore plugins return skip",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
st.RegisterPluginAsExtensions("FakePreScoreAndScorePlugin", st.NewFakePreScoreAndScorePlugin("FakePreScoreAndScorePlugin", 0,
framework.NewStatus(framework.Skip, "fake skip"),
framework.NewStatus(framework.Error, "this score function shouldn't be executed because this plugin returned Skip in the PreScore"),
), "PreScore", "Score"),
},
nodes: []string{"node1", "node2"},
pod: st.MakePod().Name("ignore").UID("ignore").Obj(),
wantNodes: sets.NewString("node1", "node2"),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
@ -2514,6 +2529,70 @@ func Test_prioritizeNodes(t *testing.T) {
},
},
},
{
name: "plugin which returned skip in preScore shouldn't be executed in the score phase",
pod: &v1.Pod{},
nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)},
pluginRegistrations: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterScorePlugin(noderesources.BalancedAllocationName, frameworkruntime.FactoryAdapter(feature.Features{}, noderesources.NewBalancedAllocation), 1),
st.RegisterScorePlugin("Node2Prioritizer", st.NewNode2PrioritizerPlugin(), 1),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
st.RegisterPluginAsExtensions("FakePreScoreAndScorePlugin", st.NewFakePreScoreAndScorePlugin("FakePreScoreAndScorePlugin", 0,
framework.NewStatus(framework.Skip, "fake skip"),
framework.NewStatus(framework.Error, "this score function shouldn't be executed because this plugin returned Skip in the PreScore"),
), "PreScore", "Score"),
},
extenders: nil,
want: []framework.NodePluginScores{
{
Name: "node1",
Scores: []framework.PluginScore{
{
Name: "Node2Prioritizer",
Score: 10,
},
{
Name: "NodeResourcesBalancedAllocation",
Score: 100,
},
},
TotalScore: 110,
},
{
Name: "node2",
Scores: []framework.PluginScore{
{
Name: "Node2Prioritizer",
Score: 100,
},
{
Name: "NodeResourcesBalancedAllocation",
Score: 100,
},
},
TotalScore: 200,
},
},
},
{
name: "all score plugins are skipped",
pod: &v1.Pod{},
nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)},
pluginRegistrations: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
st.RegisterPluginAsExtensions("FakePreScoreAndScorePlugin", st.NewFakePreScoreAndScorePlugin("FakePreScoreAndScorePlugin", 0,
framework.NewStatus(framework.Skip, "fake skip"),
framework.NewStatus(framework.Error, "this score function shouldn't be executed because this plugin returned Skip in the PreScore"),
), "PreScore", "Score"),
},
extenders: nil,
want: []framework.NodePluginScores{
{Name: "node1", Scores: []framework.PluginScore{}},
{Name: "node2", Scores: []framework.PluginScore{}},
},
},
}
for _, test := range tests {
@ -2537,7 +2616,6 @@ func Test_prioritizeNodes(t *testing.T) {
}
state := framework.NewCycleState()
fwk.RunPreScorePlugins(ctx, state, test.pod, test.nodes)
var extenders []framework.Extender
for ii := range test.extenders {
extenders = append(extenders, &test.extenders[ii])

View File

@ -245,3 +245,38 @@ func NewFakePermitPlugin(status *framework.Status, timeout time.Duration) framew
}, nil
}
}
type FakePreScoreAndScorePlugin struct {
name string
score int64
preScoreStatus *framework.Status
scoreStatus *framework.Status
}
// Name returns name of the plugin.
func (pl *FakePreScoreAndScorePlugin) Name() string {
return pl.name
}
func (pl *FakePreScoreAndScorePlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
return pl.score, pl.scoreStatus
}
func (pl *FakePreScoreAndScorePlugin) ScoreExtensions() framework.ScoreExtensions {
return nil
}
func (pl *FakePreScoreAndScorePlugin) PreScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status {
return pl.preScoreStatus
}
func NewFakePreScoreAndScorePlugin(name string, score int64, preScoreStatus, scoreStatus *framework.Status) frameworkruntime.PluginFactory {
return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
return &FakePreScoreAndScorePlugin{
name: name,
score: score,
preScoreStatus: preScoreStatus,
scoreStatus: scoreStatus,
}, nil
}
}