mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Merge pull request #80383 from liu-cong/normalizescore
Add NormalizeScore extension point for scheduler framework.
This commit is contained in:
commit
fb46ec8455
@ -780,6 +780,18 @@ func PrioritizeNodes(
|
||||
return schedulerapi.HostPriorityList{}, scoreStatus.AsError()
|
||||
}
|
||||
|
||||
// Run the Normalize Score plugins.
|
||||
status := framework.RunNormalizeScorePlugins(pluginContext, pod, scoresMap)
|
||||
if !status.IsSuccess() {
|
||||
return schedulerapi.HostPriorityList{}, status.AsError()
|
||||
}
|
||||
|
||||
// Apply weights for scores.
|
||||
status = framework.ApplyScoreWeights(pluginContext, pod, scoresMap)
|
||||
if !status.IsSuccess() {
|
||||
return schedulerapi.HostPriorityList{}, status.AsError()
|
||||
}
|
||||
|
||||
// Summarize all scores.
|
||||
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
|
||||
|
||||
|
@ -39,6 +39,14 @@ filegroup(
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["interface_test.go"],
|
||||
srcs = [
|
||||
"framework_test.go",
|
||||
"interface_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/scheduler/apis/config:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -34,20 +34,21 @@ import (
|
||||
// framework is the component responsible for initializing and running scheduler
|
||||
// plugins.
|
||||
type framework struct {
|
||||
registry Registry
|
||||
nodeInfoSnapshot *cache.NodeInfoSnapshot
|
||||
waitingPods *waitingPodsMap
|
||||
pluginNameToWeightMap map[string]int
|
||||
queueSortPlugins []QueueSortPlugin
|
||||
prefilterPlugins []PrefilterPlugin
|
||||
filterPlugins []FilterPlugin
|
||||
scorePlugins []ScorePlugin
|
||||
reservePlugins []ReservePlugin
|
||||
prebindPlugins []PrebindPlugin
|
||||
bindPlugins []BindPlugin
|
||||
postbindPlugins []PostbindPlugin
|
||||
unreservePlugins []UnreservePlugin
|
||||
permitPlugins []PermitPlugin
|
||||
registry Registry
|
||||
nodeInfoSnapshot *cache.NodeInfoSnapshot
|
||||
waitingPods *waitingPodsMap
|
||||
pluginNameToWeightMap map[string]int
|
||||
queueSortPlugins []QueueSortPlugin
|
||||
prefilterPlugins []PrefilterPlugin
|
||||
filterPlugins []FilterPlugin
|
||||
scorePlugins []ScorePlugin
|
||||
scoreWithNormalizePlugins []ScoreWithNormalizePlugin
|
||||
reservePlugins []ReservePlugin
|
||||
prebindPlugins []PrebindPlugin
|
||||
bindPlugins []BindPlugin
|
||||
postbindPlugins []PostbindPlugin
|
||||
unreservePlugins []UnreservePlugin
|
||||
permitPlugins []PermitPlugin
|
||||
}
|
||||
|
||||
const (
|
||||
@ -131,6 +132,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
|
||||
if plugins.Score != nil {
|
||||
for _, sc := range plugins.Score.Enabled {
|
||||
if pg, ok := pluginsMap[sc.Name]; ok {
|
||||
// First, make sure the plugin implements ScorePlugin interface.
|
||||
p, ok := pg.(ScorePlugin)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("plugin %v does not extend score plugin", sc.Name)
|
||||
@ -139,6 +141,13 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
|
||||
return nil, fmt.Errorf("score plugin %v is not configured with weight", p.Name())
|
||||
}
|
||||
f.scorePlugins = append(f.scorePlugins, p)
|
||||
|
||||
// Next, if the plugin also implements ScoreWithNormalizePlugin interface,
|
||||
// add it to the normalizeScore plugin list.
|
||||
np, ok := pg.(ScoreWithNormalizePlugin)
|
||||
if ok {
|
||||
f.scoreWithNormalizePlugins = append(f.scoreWithNormalizePlugins, np)
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("score plugin %v does not exist", sc.Name)
|
||||
}
|
||||
@ -317,14 +326,12 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.
|
||||
errCh := schedutil.NewErrorChannel()
|
||||
workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) {
|
||||
for _, pl := range f.scorePlugins {
|
||||
// Score plugins' weight has been checked when they are initialized.
|
||||
weight := f.pluginNameToWeightMap[pl.Name()]
|
||||
score, status := pl.Score(pc, pod, nodes[index].Name)
|
||||
if !status.IsSuccess() {
|
||||
errCh.SendErrorWithCancel(fmt.Errorf(status.Message()), cancel)
|
||||
return
|
||||
}
|
||||
pluginToNodeScoreMap[pl.Name()][index] = score * weight
|
||||
pluginToNodeScoreMap[pl.Name()][index] = score
|
||||
}
|
||||
})
|
||||
|
||||
@ -337,6 +344,64 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.
|
||||
return pluginToNodeScoreMap, nil
|
||||
}
|
||||
|
||||
// RunNormalizeScorePlugins runs the NormalizeScore function of Score plugins.
|
||||
// It should be called after RunScorePlugins with the PluginToNodeScoreMap result.
|
||||
// It then modifies the map with normalized scores. It returns a non-success Status
|
||||
// if any of the NormalizeScore functions returns a non-success status.
|
||||
func (f *framework) RunNormalizeScorePlugins(pc *PluginContext, pod *v1.Pod, scores PluginToNodeScoreMap) *Status {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
errCh := schedutil.NewErrorChannel()
|
||||
workqueue.ParallelizeUntil(ctx, 16, len(f.scoreWithNormalizePlugins), func(index int) {
|
||||
pl := f.scoreWithNormalizePlugins[index]
|
||||
nodeScoreList, ok := scores[pl.Name()]
|
||||
if !ok {
|
||||
err := fmt.Errorf("normalize score plugin %v has no corresponding scores in the PluginToNodeScoreMap", pl.Name())
|
||||
errCh.SendErrorWithCancel(err, cancel)
|
||||
}
|
||||
status := pl.NormalizeScore(pc, pod, nodeScoreList)
|
||||
if !status.IsSuccess() {
|
||||
err := fmt.Errorf("normalize score plugin %v failed with error %v", pl.Name(), status.Message())
|
||||
errCh.SendErrorWithCancel(err, cancel)
|
||||
}
|
||||
})
|
||||
|
||||
if err := errCh.ReceiveError(); err != nil {
|
||||
msg := fmt.Sprintf("error while running normalize score plugin for pod %v: %v", pod.Name, err)
|
||||
klog.Error(msg)
|
||||
return NewStatus(Error, msg)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ApplyScoreWeights applies weights to the score results. It should be called after
|
||||
// RunNormalizeScorePlugins.
|
||||
func (f *framework) ApplyScoreWeights(pc *PluginContext, pod *v1.Pod, scores PluginToNodeScoreMap) *Status {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
errCh := schedutil.NewErrorChannel()
|
||||
workqueue.ParallelizeUntil(ctx, 16, len(f.scorePlugins), func(index int) {
|
||||
pl := f.scorePlugins[index]
|
||||
// Score plugins' weight has been checked when they are initialized.
|
||||
weight := f.pluginNameToWeightMap[pl.Name()]
|
||||
nodeScoreList, ok := scores[pl.Name()]
|
||||
if !ok {
|
||||
err := fmt.Errorf("score plugin %v has no corresponding scores in the PluginToNodeScoreMap", pl.Name())
|
||||
errCh.SendErrorWithCancel(err, cancel)
|
||||
}
|
||||
for i := range nodeScoreList {
|
||||
nodeScoreList[i] = nodeScoreList[i] * weight
|
||||
}
|
||||
})
|
||||
|
||||
if err := errCh.ReceiveError(); err != nil {
|
||||
msg := fmt.Sprintf("error while applying score weights for pod %v: %v", pod.Name, err)
|
||||
klog.Error(msg)
|
||||
return NewStatus(Error, msg)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RunPrebindPlugins runs the set of configured prebind plugins. It returns a
|
||||
// failure (bool) if any of the plugins returns an error. It also returns an
|
||||
// error containing the rejection message or the error occurred in the plugin.
|
||||
@ -520,7 +585,6 @@ func pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin {
|
||||
find(plugins.Filter)
|
||||
find(plugins.PostFilter)
|
||||
find(plugins.Score)
|
||||
find(plugins.NormalizeScore)
|
||||
find(plugins.Reserve)
|
||||
find(plugins.Permit)
|
||||
find(plugins.PreBind)
|
||||
|
434
pkg/scheduler/framework/v1alpha1/framework_test.go
Normal file
434
pkg/scheduler/framework/v1alpha1/framework_test.go
Normal file
@ -0,0 +1,434 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package v1alpha1
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
)
|
||||
|
||||
const (
|
||||
scorePlugin1 = "score-plugin-1"
|
||||
scorePlugin2 = "score-plugin-2"
|
||||
scorePlugin3 = "score-plugin-3"
|
||||
pluginNotImplementingScore = "plugin-not-implementing-score"
|
||||
weight1 = 2
|
||||
weight2 = 3
|
||||
weight3 = 4
|
||||
)
|
||||
|
||||
// TestScorePlugin1 and 2 implements ScoreWithNormalizePlugin interface,
|
||||
// TestScorePlugin3 only implements ScorePlugin interface.
|
||||
var _ = ScoreWithNormalizePlugin(&TestScorePlugin1{})
|
||||
var _ = ScoreWithNormalizePlugin(&TestScorePlugin2{})
|
||||
var _ = ScorePlugin(&TestScorePlugin3{})
|
||||
|
||||
type TestScorePlugin1 struct {
|
||||
// If fail is true, NormalizeScore will return error status.
|
||||
fail bool
|
||||
}
|
||||
|
||||
// NewScorePlugin1 is the factory for NormalizeScore plugin 1.
|
||||
func NewScorePlugin1(_ *runtime.Unknown, _ FrameworkHandle) (Plugin, error) {
|
||||
return &TestScorePlugin1{}, nil
|
||||
}
|
||||
|
||||
// NewScorePlugin1InjectFailure creates a new TestScorePlugin1 which will
|
||||
// return an error status for NormalizeScore.
|
||||
func NewScorePlugin1InjectFailure(_ *runtime.Unknown, _ FrameworkHandle) (Plugin, error) {
|
||||
return &TestScorePlugin1{fail: true}, nil
|
||||
}
|
||||
|
||||
func (pl *TestScorePlugin1) Name() string {
|
||||
return scorePlugin1
|
||||
}
|
||||
|
||||
func (pl *TestScorePlugin1) NormalizeScore(pc *PluginContext, pod *v1.Pod, scores NodeScoreList) *Status {
|
||||
if pl.fail {
|
||||
return NewStatus(Error, "injecting failure.")
|
||||
}
|
||||
// Simply decrease each node score by 1.
|
||||
for i := range scores {
|
||||
scores[i] = scores[i] - 1
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pl *TestScorePlugin1) Score(pc *PluginContext, p *v1.Pod, nodeName string) (int, *Status) {
|
||||
// Score is currently not used in the tests so just return some dummy value.
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
type TestScorePlugin2 struct{}
|
||||
|
||||
// NewScorePlugin2 is the factory for NormalizeScore plugin 2.
|
||||
func NewScorePlugin2(_ *runtime.Unknown, _ FrameworkHandle) (Plugin, error) {
|
||||
return &TestScorePlugin2{}, nil
|
||||
}
|
||||
|
||||
func (pl *TestScorePlugin2) Name() string {
|
||||
return scorePlugin2
|
||||
}
|
||||
|
||||
func (pl *TestScorePlugin2) NormalizeScore(pc *PluginContext, pod *v1.Pod, scores NodeScoreList) *Status {
|
||||
// Simply force each node score to 5.
|
||||
for i := range scores {
|
||||
scores[i] = 5
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pl *TestScorePlugin2) Score(pc *PluginContext, p *v1.Pod, nodeName string) (int, *Status) {
|
||||
// Score is currently not used in the tests so just return some dummy value.
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// TestScorePlugin3 only implements ScorePlugin interface.
|
||||
type TestScorePlugin3 struct{}
|
||||
|
||||
// NewScorePlugin3 is the factory for Score plugin 3.
|
||||
func NewScorePlugin3(_ *runtime.Unknown, _ FrameworkHandle) (Plugin, error) {
|
||||
return &TestScorePlugin3{}, nil
|
||||
}
|
||||
|
||||
func (pl *TestScorePlugin3) Name() string {
|
||||
return scorePlugin3
|
||||
}
|
||||
|
||||
func (pl *TestScorePlugin3) Score(pc *PluginContext, p *v1.Pod, nodeName string) (int, *Status) {
|
||||
// Score is currently not used in the tests so just return some dummy value.
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// PluginNotImplementingScore doesn't implement the ScorePlugin interface.
|
||||
type PluginNotImplementingScore struct{}
|
||||
|
||||
// NewPluginNotImplementingScore is the factory for PluginNotImplementingScore.
|
||||
func NewPluginNotImplementingScore(_ *runtime.Unknown, _ FrameworkHandle) (Plugin, error) {
|
||||
return &PluginNotImplementingScore{}, nil
|
||||
}
|
||||
|
||||
func (pl *PluginNotImplementingScore) Name() string {
|
||||
return pluginNotImplementingScore
|
||||
}
|
||||
|
||||
var registry = Registry{
|
||||
scorePlugin1: NewScorePlugin1,
|
||||
scorePlugin2: NewScorePlugin2,
|
||||
scorePlugin3: NewScorePlugin3,
|
||||
pluginNotImplementingScore: NewPluginNotImplementingScore,
|
||||
}
|
||||
|
||||
var plugin1 = &config.Plugins{
|
||||
Score: &config.PluginSet{
|
||||
Enabled: []config.Plugin{
|
||||
{Name: scorePlugin1, Weight: weight1},
|
||||
},
|
||||
},
|
||||
}
|
||||
var plugin3 = &config.Plugins{
|
||||
Score: &config.PluginSet{
|
||||
Enabled: []config.Plugin{
|
||||
{Name: scorePlugin3, Weight: weight3},
|
||||
},
|
||||
},
|
||||
}
|
||||
var plugin1And2 = &config.Plugins{
|
||||
Score: &config.PluginSet{
|
||||
Enabled: []config.Plugin{
|
||||
{Name: scorePlugin1, Weight: weight1},
|
||||
{Name: scorePlugin2, Weight: weight2},
|
||||
},
|
||||
},
|
||||
}
|
||||
var plugin1And3 = &config.Plugins{
|
||||
Score: &config.PluginSet{
|
||||
Enabled: []config.Plugin{
|
||||
{Name: scorePlugin1, Weight: weight1},
|
||||
{Name: scorePlugin3, Weight: weight3},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// No specific config required.
|
||||
var args = []config.PluginConfig{}
|
||||
var pc = &PluginContext{}
|
||||
|
||||
// Pod is only used for logging errors.
|
||||
var pod = &v1.Pod{}
|
||||
|
||||
func TestInitFrameworkWithScorePlugins(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
plugins *config.Plugins
|
||||
// If initErr is true, we expect framework initialization to fail.
|
||||
initErr bool
|
||||
}{
|
||||
{
|
||||
name: "enabled Score plugin doesn't exist in registry",
|
||||
plugins: &config.Plugins{
|
||||
Score: &config.PluginSet{
|
||||
Enabled: []config.Plugin{
|
||||
{Name: "notExist"},
|
||||
},
|
||||
},
|
||||
},
|
||||
initErr: true,
|
||||
},
|
||||
{
|
||||
name: "enabled Score plugin doesn't extend the ScorePlugin interface",
|
||||
plugins: &config.Plugins{
|
||||
Score: &config.PluginSet{
|
||||
Enabled: []config.Plugin{
|
||||
{Name: pluginNotImplementingScore},
|
||||
},
|
||||
},
|
||||
},
|
||||
initErr: true,
|
||||
},
|
||||
{
|
||||
name: "Score plugins are nil",
|
||||
plugins: &config.Plugins{Score: nil},
|
||||
},
|
||||
{
|
||||
name: "enabled Score plugin list is empty",
|
||||
plugins: &config.Plugins{
|
||||
Score: &config.PluginSet{
|
||||
Enabled: []config.Plugin{},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "enabled plugin only implements ScorePlugin interface",
|
||||
plugins: &config.Plugins{
|
||||
Score: &config.PluginSet{
|
||||
Enabled: []config.Plugin{
|
||||
{Name: scorePlugin3},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "enabled plugin implements ScoreWithNormalizePlugin interface",
|
||||
plugins: &config.Plugins{
|
||||
Score: &config.PluginSet{
|
||||
Enabled: []config.Plugin{
|
||||
{Name: scorePlugin1},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
_, err := NewFramework(registry, tt.plugins, args)
|
||||
if tt.initErr && err == nil {
|
||||
t.Fatal("Framework initialization should fail")
|
||||
}
|
||||
if !tt.initErr && err != nil {
|
||||
t.Fatalf("Failed to create framework for testing: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunNormalizeScorePlugins(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
registry Registry
|
||||
plugins *config.Plugins
|
||||
input PluginToNodeScoreMap
|
||||
want PluginToNodeScoreMap
|
||||
// If err is true, we expect RunNormalizeScorePlugin to fail.
|
||||
err bool
|
||||
}{
|
||||
{
|
||||
name: "no NormalizeScore plugins",
|
||||
plugins: plugin3,
|
||||
registry: registry,
|
||||
input: PluginToNodeScoreMap{
|
||||
scorePlugin1: {2, 3},
|
||||
},
|
||||
// No NormalizeScore plugin, map should be untouched.
|
||||
want: PluginToNodeScoreMap{
|
||||
scorePlugin1: {2, 3},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "single Score plugin, single NormalizeScore plugin",
|
||||
registry: registry,
|
||||
plugins: plugin1,
|
||||
input: PluginToNodeScoreMap{
|
||||
scorePlugin1: {2, 3},
|
||||
},
|
||||
want: PluginToNodeScoreMap{
|
||||
// For plugin1, want=input-1.
|
||||
scorePlugin1: {1, 2},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "2 Score plugins, 2 NormalizeScore plugins",
|
||||
registry: registry,
|
||||
plugins: plugin1And2,
|
||||
input: PluginToNodeScoreMap{
|
||||
scorePlugin1: {2, 3},
|
||||
scorePlugin2: {2, 4},
|
||||
},
|
||||
want: PluginToNodeScoreMap{
|
||||
// For plugin1, want=input-1.
|
||||
scorePlugin1: {1, 2},
|
||||
// For plugin2, want=5.
|
||||
scorePlugin2: {5, 5},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "2 Score plugins, 1 NormalizeScore plugin",
|
||||
registry: registry,
|
||||
plugins: plugin1And3,
|
||||
input: PluginToNodeScoreMap{
|
||||
scorePlugin1: {2, 3},
|
||||
scorePlugin3: {2, 4},
|
||||
},
|
||||
want: PluginToNodeScoreMap{
|
||||
// For plugin1, want=input-1.
|
||||
scorePlugin1: {1, 2},
|
||||
// No NormalizeScore for plugin 3. The node scores are untouched.
|
||||
scorePlugin3: {2, 4},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "score map contains both test plugin 1 and 2 but plugin 1 fails",
|
||||
registry: Registry{
|
||||
scorePlugin1: NewScorePlugin1InjectFailure,
|
||||
scorePlugin2: NewScorePlugin2,
|
||||
},
|
||||
plugins: plugin1And2,
|
||||
input: PluginToNodeScoreMap{
|
||||
scorePlugin1: {2, 3},
|
||||
scorePlugin2: {2, 4},
|
||||
},
|
||||
err: true,
|
||||
},
|
||||
{
|
||||
name: "2 plugins but score map only contains plugin1",
|
||||
registry: registry,
|
||||
plugins: plugin1And2,
|
||||
input: PluginToNodeScoreMap{
|
||||
scorePlugin1: {2, 3},
|
||||
},
|
||||
err: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
f, err := NewFramework(tt.registry, tt.plugins, args)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create framework for testing: %v", err)
|
||||
}
|
||||
|
||||
status := f.RunNormalizeScorePlugins(pc, pod, tt.input)
|
||||
|
||||
if tt.err {
|
||||
if status.IsSuccess() {
|
||||
t.Errorf("Expected status to be non-success.")
|
||||
}
|
||||
} else {
|
||||
if !status.IsSuccess() {
|
||||
t.Errorf("Expected status to be success.")
|
||||
}
|
||||
if !reflect.DeepEqual(tt.input, tt.want) {
|
||||
t.Errorf("Score map after RunNormalizeScorePlugin: %+v, want: %+v.", tt.input, tt.want)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestApplyScoreWeights(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
plugins *config.Plugins
|
||||
input PluginToNodeScoreMap
|
||||
want PluginToNodeScoreMap
|
||||
// If err is true, we expect ApplyScoreWeights to fail.
|
||||
err bool
|
||||
}{
|
||||
{
|
||||
name: "single Score plugin, single nodeScoreList",
|
||||
plugins: plugin1,
|
||||
input: PluginToNodeScoreMap{
|
||||
scorePlugin1: {2, 3},
|
||||
},
|
||||
want: PluginToNodeScoreMap{
|
||||
// For plugin1, want=input*weight1.
|
||||
scorePlugin1: {4, 6},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "2 Score plugins, 2 nodeScoreLists in scoreMap",
|
||||
plugins: plugin1And2,
|
||||
input: PluginToNodeScoreMap{
|
||||
scorePlugin1: {2, 3},
|
||||
scorePlugin2: {2, 4},
|
||||
},
|
||||
want: PluginToNodeScoreMap{
|
||||
// For plugin1, want=input*weight1.
|
||||
scorePlugin1: {4, 6},
|
||||
// For plugin2, want=input*weight2.
|
||||
scorePlugin2: {6, 12},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "2 Score plugins, 1 without corresponding nodeScoreList in the score map",
|
||||
plugins: plugin1And2,
|
||||
input: PluginToNodeScoreMap{
|
||||
scorePlugin1: {2, 3},
|
||||
},
|
||||
err: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
f, err := NewFramework(registry, tt.plugins, args)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create framework for testing: %v", err)
|
||||
}
|
||||
|
||||
status := f.ApplyScoreWeights(pc, pod, tt.input)
|
||||
|
||||
if tt.err {
|
||||
if status.IsSuccess() {
|
||||
t.Errorf("Expected status to be non-success.")
|
||||
}
|
||||
} else {
|
||||
if !status.IsSuccess() {
|
||||
t.Errorf("Expected status to be success.")
|
||||
}
|
||||
if !reflect.DeepEqual(tt.input, tt.want) {
|
||||
t.Errorf("Score map after RunNormalizeScorePlugin: %+v, want: %+v.", tt.input, tt.want)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -170,6 +170,17 @@ type ScorePlugin interface {
|
||||
Score(pc *PluginContext, p *v1.Pod, nodeName string) (int, *Status)
|
||||
}
|
||||
|
||||
// ScoreWithNormalizePlugin is an interface that must be implemented by "score"
|
||||
// plugins that also need to normalize the node scoring results produced by the same
|
||||
// plugin's "Score" method.
|
||||
type ScoreWithNormalizePlugin interface {
|
||||
ScorePlugin
|
||||
// NormalizeScore is called for all node scores produced by the same plugin's "Score"
|
||||
// method. A successful run of NormalizeScore will update the scores list and return
|
||||
// a success status.
|
||||
NormalizeScore(pc *PluginContext, p *v1.Pod, scores NodeScoreList) *Status
|
||||
}
|
||||
|
||||
// ReservePlugin is an interface for Reserve plugins. These plugins are called
|
||||
// at the reservation point. These are meant to update the state of the plugin.
|
||||
// This concept used to be called 'assume' in the original scheduler.
|
||||
@ -264,6 +275,16 @@ type Framework interface {
|
||||
// a non-success status.
|
||||
RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.Node) (PluginToNodeScoreMap, *Status)
|
||||
|
||||
// RunNormalizeScorePlugins runs the normalize score plugins. It should be called after
|
||||
// RunScorePlugins with the PluginToNodeScoreMap result. It then modifies the map with
|
||||
// normalized scores. It returns a non-success Status if any of the normalize score plugins
|
||||
// returns a non-success status.
|
||||
RunNormalizeScorePlugins(pc *PluginContext, pod *v1.Pod, scores PluginToNodeScoreMap) *Status
|
||||
|
||||
// ApplyScoreWeights applies weights to the score results. It should be called after
|
||||
// RunNormalizeScorePlugins.
|
||||
ApplyScoreWeights(pc *PluginContext, pod *v1.Pod, scores PluginToNodeScoreMap) *Status
|
||||
|
||||
// RunPrebindPlugins runs the set of configured prebind plugins. It returns
|
||||
// *Status and its code is set to non-success if any of the plugins returns
|
||||
// anything but Success. If the Status code is "Unschedulable", it is
|
||||
|
@ -179,6 +179,14 @@ func (*fakeFramework) RunScorePlugins(pc *framework.PluginContext, pod *v1.Pod,
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (*fakeFramework) RunNormalizeScorePlugins(pc *framework.PluginContext, pod *v1.Pod, scores framework.PluginToNodeScoreMap) *framework.Status {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*fakeFramework) ApplyScoreWeights(pc *framework.PluginContext, pod *v1.Pod, scores framework.PluginToNodeScoreMap) *framework.Status {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*fakeFramework) RunPrebindPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
return nil
|
||||
}
|
||||
|
@ -38,9 +38,14 @@ type PrefilterPlugin struct {
|
||||
}
|
||||
|
||||
type ScorePlugin struct {
|
||||
failScore bool
|
||||
numCalled int
|
||||
highScoreNode string
|
||||
failScore bool
|
||||
numScoreCalled int
|
||||
highScoreNode string
|
||||
}
|
||||
|
||||
type ScoreWithNormalizePlugin struct {
|
||||
numScoreCalled int
|
||||
numNormalizeScoreCalled int
|
||||
}
|
||||
|
||||
type FilterPlugin struct {
|
||||
@ -91,19 +96,22 @@ type PermitPlugin struct {
|
||||
}
|
||||
|
||||
const (
|
||||
prefilterPluginName = "prefilter-plugin"
|
||||
scorePluginName = "score-plugin"
|
||||
filterPluginName = "filter-plugin"
|
||||
reservePluginName = "reserve-plugin"
|
||||
prebindPluginName = "prebind-plugin"
|
||||
unreservePluginName = "unreserve-plugin"
|
||||
postbindPluginName = "postbind-plugin"
|
||||
permitPluginName = "permit-plugin"
|
||||
prefilterPluginName = "prefilter-plugin"
|
||||
scorePluginName = "score-plugin"
|
||||
scoreWithNormalizePluginName = "score-with-normalize-plugin"
|
||||
filterPluginName = "filter-plugin"
|
||||
reservePluginName = "reserve-plugin"
|
||||
prebindPluginName = "prebind-plugin"
|
||||
unreservePluginName = "unreserve-plugin"
|
||||
postbindPluginName = "postbind-plugin"
|
||||
permitPluginName = "permit-plugin"
|
||||
)
|
||||
|
||||
var _ = framework.PrefilterPlugin(&PrefilterPlugin{})
|
||||
var _ = framework.ScorePlugin(&ScorePlugin{})
|
||||
var _ = framework.FilterPlugin(&FilterPlugin{})
|
||||
var _ = framework.ScorePlugin(&ScorePlugin{})
|
||||
var _ = framework.ScoreWithNormalizePlugin(&ScoreWithNormalizePlugin{})
|
||||
var _ = framework.ReservePlugin(&ReservePlugin{})
|
||||
var _ = framework.PrebindPlugin(&PrebindPlugin{})
|
||||
var _ = framework.BindPlugin(&BindPlugin{})
|
||||
@ -111,6 +119,13 @@ var _ = framework.PostbindPlugin(&PostbindPlugin{})
|
||||
var _ = framework.UnreservePlugin(&UnreservePlugin{})
|
||||
var _ = framework.PermitPlugin(&PermitPlugin{})
|
||||
|
||||
var scPlugin = &ScorePlugin{}
|
||||
|
||||
// NewScorePlugin is the factory for score plugin.
|
||||
func NewScorePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
|
||||
return scPlugin, nil
|
||||
}
|
||||
|
||||
// Name returns name of the score plugin.
|
||||
func (sp *ScorePlugin) Name() string {
|
||||
return scorePluginName
|
||||
@ -119,21 +134,19 @@ func (sp *ScorePlugin) Name() string {
|
||||
// reset returns name of the score plugin.
|
||||
func (sp *ScorePlugin) reset() {
|
||||
sp.failScore = false
|
||||
sp.numCalled = 0
|
||||
sp.numScoreCalled = 0
|
||||
sp.highScoreNode = ""
|
||||
}
|
||||
|
||||
var scPlugin = &ScorePlugin{}
|
||||
|
||||
// Score returns the score of scheduling a pod on a specific node.
|
||||
func (sp *ScorePlugin) Score(pc *framework.PluginContext, p *v1.Pod, nodeName string) (int, *framework.Status) {
|
||||
sp.numCalled++
|
||||
sp.numScoreCalled++
|
||||
if sp.failScore {
|
||||
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", p.Name))
|
||||
}
|
||||
|
||||
score := 10
|
||||
if sp.numCalled == 1 {
|
||||
if sp.numScoreCalled == 1 {
|
||||
// The first node is scored the highest, the rest is scored lower.
|
||||
sp.highScoreNode = nodeName
|
||||
score = 100
|
||||
@ -141,9 +154,34 @@ func (sp *ScorePlugin) Score(pc *framework.PluginContext, p *v1.Pod, nodeName st
|
||||
return score, nil
|
||||
}
|
||||
|
||||
// NewScorePlugin is the factory for score plugin.
|
||||
func NewScorePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
|
||||
return scPlugin, nil
|
||||
var scoreWithNormalizePlguin = &ScoreWithNormalizePlugin{}
|
||||
|
||||
// Name returns name of the score plugin.
|
||||
func (sp *ScoreWithNormalizePlugin) Name() string {
|
||||
return scoreWithNormalizePluginName
|
||||
}
|
||||
|
||||
// reset returns name of the score plugin.
|
||||
func (sp *ScoreWithNormalizePlugin) reset() {
|
||||
sp.numScoreCalled = 0
|
||||
sp.numNormalizeScoreCalled = 0
|
||||
}
|
||||
|
||||
// Score returns the score of scheduling a pod on a specific node.
|
||||
func (sp *ScoreWithNormalizePlugin) Score(pc *framework.PluginContext, p *v1.Pod, nodeName string) (int, *framework.Status) {
|
||||
sp.numScoreCalled++
|
||||
score := 10
|
||||
return score, nil
|
||||
}
|
||||
|
||||
func (sp *ScoreWithNormalizePlugin) NormalizeScore(pc *framework.PluginContext, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
|
||||
sp.numNormalizeScoreCalled++
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewScoreWithNormalizePlugin is the factory for score with normalize plugin.
|
||||
func NewScoreWithNormalizePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
|
||||
return scoreWithNormalizePlguin, nil
|
||||
}
|
||||
|
||||
var filterPlugin = &FilterPlugin{}
|
||||
@ -490,9 +528,6 @@ func TestPrefilterPlugin(t *testing.T) {
|
||||
|
||||
// TestScorePlugin tests invocation of score plugins.
|
||||
func TestScorePlugin(t *testing.T) {
|
||||
// Create a plugin registry for testing. Register only a score plugin.
|
||||
registry := framework.Registry{scorePluginName: NewScorePlugin}
|
||||
|
||||
// Setup initial score plugin for testing.
|
||||
plugins := &schedulerconfig.Plugins{
|
||||
Score: &schedulerconfig.PluginSet{
|
||||
@ -503,22 +538,9 @@ func TestScorePlugin(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
// Set empty plugin config for testing
|
||||
emptyPluginConfig := []schedulerconfig.PluginConfig{}
|
||||
|
||||
// Create the master and the scheduler with the test plugin set.
|
||||
context := initTestSchedulerWithOptions(t,
|
||||
initTestMaster(t, "score-plugin", nil),
|
||||
false, nil, registry, plugins, emptyPluginConfig, false, time.Second)
|
||||
context, cs := initTestContextForScorePlugin(t, plugins)
|
||||
defer cleanupTest(t, context)
|
||||
|
||||
cs := context.clientSet
|
||||
// Add multiple nodes, one of them will be scored much higher than the others.
|
||||
_, err := createNodes(cs, "test-node", nil, 10)
|
||||
if err != nil {
|
||||
t.Fatalf("Cannot create nodes: %v", err)
|
||||
}
|
||||
|
||||
for i, fail := range []bool{false, true} {
|
||||
scPlugin.failScore = fail
|
||||
// Create a best effort pod.
|
||||
@ -545,8 +567,8 @@ func TestScorePlugin(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
if scPlugin.numCalled == 0 {
|
||||
t.Errorf("Expected the reserve plugin to be called.")
|
||||
if scPlugin.numScoreCalled == 0 {
|
||||
t.Errorf("Expected the score plugin to be called.")
|
||||
}
|
||||
|
||||
scPlugin.reset()
|
||||
@ -554,6 +576,42 @@ func TestScorePlugin(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestNormalizeScorePlugin tests invocation of normalize score plugins.
|
||||
func TestNormalizeScorePlugin(t *testing.T) {
|
||||
// Setup initial score plugin for testing.
|
||||
plugins := &schedulerconfig.Plugins{
|
||||
Score: &schedulerconfig.PluginSet{
|
||||
Enabled: []schedulerconfig.Plugin{
|
||||
{
|
||||
Name: scoreWithNormalizePluginName,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
context, cs := initTestContextForScorePlugin(t, plugins)
|
||||
defer cleanupTest(t, context)
|
||||
|
||||
// Create a best effort pod.
|
||||
pod, err := createPausePod(cs,
|
||||
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
|
||||
if err != nil {
|
||||
t.Fatalf("Error while creating a test pod: %v", err)
|
||||
}
|
||||
|
||||
if err = waitForPodToSchedule(cs, pod); err != nil {
|
||||
t.Errorf("Expected the pod to be scheduled. error: %v", err)
|
||||
}
|
||||
|
||||
if scoreWithNormalizePlguin.numScoreCalled == 0 {
|
||||
t.Errorf("Expected the score plugin to be called.")
|
||||
}
|
||||
if scoreWithNormalizePlguin.numNormalizeScoreCalled == 0 {
|
||||
t.Error("Expected the normalize score plugin to be called")
|
||||
}
|
||||
|
||||
scoreWithNormalizePlguin.reset()
|
||||
}
|
||||
|
||||
// TestReservePlugin tests invocation of reserve plugins.
|
||||
func TestReservePlugin(t *testing.T) {
|
||||
// Create a plugin registry for testing. Register only a reserve plugin.
|
||||
@ -1477,3 +1535,26 @@ func TestPreemptWithPermitPlugin(t *testing.T) {
|
||||
perPlugin.reset()
|
||||
cleanupPods(cs, t, []*v1.Pod{waitingPod, preemptorPod})
|
||||
}
|
||||
|
||||
func initTestContextForScorePlugin(t *testing.T, plugins *schedulerconfig.Plugins) (*testContext, *clientset.Clientset) {
|
||||
// Create a plugin registry for testing. Register only a score plugin.
|
||||
registry := framework.Registry{
|
||||
scorePluginName: NewScorePlugin,
|
||||
scoreWithNormalizePluginName: NewScoreWithNormalizePlugin,
|
||||
}
|
||||
|
||||
// Set empty plugin config for testing
|
||||
emptyPluginConfig := []schedulerconfig.PluginConfig{}
|
||||
|
||||
// Create the master and the scheduler with the test plugin set.
|
||||
context := initTestSchedulerWithOptions(t,
|
||||
initTestMaster(t, "score-plugin", nil),
|
||||
false, nil, registry, plugins, emptyPluginConfig, false, time.Second)
|
||||
|
||||
cs := context.clientSet
|
||||
_, err := createNodes(cs, "test-node", nil, 10)
|
||||
if err != nil {
|
||||
t.Fatalf("Cannot create nodes: %v", err)
|
||||
}
|
||||
return context, cs
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user