diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 24780e05b56..f32bc69b4b5 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -780,6 +780,18 @@ func PrioritizeNodes( return schedulerapi.HostPriorityList{}, scoreStatus.AsError() } + // Run the Normalize Score plugins. + status := framework.RunNormalizeScorePlugins(pluginContext, pod, scoresMap) + if !status.IsSuccess() { + return schedulerapi.HostPriorityList{}, status.AsError() + } + + // Apply weights for scores. + status = framework.ApplyScoreWeights(pluginContext, pod, scoresMap) + if !status.IsSuccess() { + return schedulerapi.HostPriorityList{}, status.AsError() + } + // Summarize all scores. result := make(schedulerapi.HostPriorityList, 0, len(nodes)) diff --git a/pkg/scheduler/framework/v1alpha1/BUILD b/pkg/scheduler/framework/v1alpha1/BUILD index 9f8fdf87a5a..3962dbade6f 100644 --- a/pkg/scheduler/framework/v1alpha1/BUILD +++ b/pkg/scheduler/framework/v1alpha1/BUILD @@ -39,6 +39,14 @@ filegroup( go_test( name = "go_default_test", - srcs = ["interface_test.go"], + srcs = [ + "framework_test.go", + "interface_test.go", + ], embed = [":go_default_library"], + deps = [ + "//pkg/scheduler/apis/config:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + ], ) diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index 547514234ad..64af9d52870 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -34,20 +34,21 @@ import ( // framework is the component responsible for initializing and running scheduler // plugins. type framework struct { - registry Registry - nodeInfoSnapshot *cache.NodeInfoSnapshot - waitingPods *waitingPodsMap - pluginNameToWeightMap map[string]int - queueSortPlugins []QueueSortPlugin - prefilterPlugins []PrefilterPlugin - filterPlugins []FilterPlugin - scorePlugins []ScorePlugin - reservePlugins []ReservePlugin - prebindPlugins []PrebindPlugin - bindPlugins []BindPlugin - postbindPlugins []PostbindPlugin - unreservePlugins []UnreservePlugin - permitPlugins []PermitPlugin + registry Registry + nodeInfoSnapshot *cache.NodeInfoSnapshot + waitingPods *waitingPodsMap + pluginNameToWeightMap map[string]int + queueSortPlugins []QueueSortPlugin + prefilterPlugins []PrefilterPlugin + filterPlugins []FilterPlugin + scorePlugins []ScorePlugin + scoreWithNormalizePlugins []ScoreWithNormalizePlugin + reservePlugins []ReservePlugin + prebindPlugins []PrebindPlugin + bindPlugins []BindPlugin + postbindPlugins []PostbindPlugin + unreservePlugins []UnreservePlugin + permitPlugins []PermitPlugin } const ( @@ -131,6 +132,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi if plugins.Score != nil { for _, sc := range plugins.Score.Enabled { if pg, ok := pluginsMap[sc.Name]; ok { + // First, make sure the plugin implements ScorePlugin interface. p, ok := pg.(ScorePlugin) if !ok { return nil, fmt.Errorf("plugin %v does not extend score plugin", sc.Name) @@ -139,6 +141,13 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi return nil, fmt.Errorf("score plugin %v is not configured with weight", p.Name()) } f.scorePlugins = append(f.scorePlugins, p) + + // Next, if the plugin also implements ScoreWithNormalizePlugin interface, + // add it to the normalizeScore plugin list. + np, ok := pg.(ScoreWithNormalizePlugin) + if ok { + f.scoreWithNormalizePlugins = append(f.scoreWithNormalizePlugins, np) + } } else { return nil, fmt.Errorf("score plugin %v does not exist", sc.Name) } @@ -317,14 +326,12 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1. errCh := schedutil.NewErrorChannel() workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) { for _, pl := range f.scorePlugins { - // Score plugins' weight has been checked when they are initialized. - weight := f.pluginNameToWeightMap[pl.Name()] score, status := pl.Score(pc, pod, nodes[index].Name) if !status.IsSuccess() { errCh.SendErrorWithCancel(fmt.Errorf(status.Message()), cancel) return } - pluginToNodeScoreMap[pl.Name()][index] = score * weight + pluginToNodeScoreMap[pl.Name()][index] = score } }) @@ -337,6 +344,64 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1. return pluginToNodeScoreMap, nil } +// RunNormalizeScorePlugins runs the NormalizeScore function of Score plugins. +// It should be called after RunScorePlugins with the PluginToNodeScoreMap result. +// It then modifies the map with normalized scores. It returns a non-success Status +// if any of the NormalizeScore functions returns a non-success status. +func (f *framework) RunNormalizeScorePlugins(pc *PluginContext, pod *v1.Pod, scores PluginToNodeScoreMap) *Status { + ctx, cancel := context.WithCancel(context.Background()) + errCh := schedutil.NewErrorChannel() + workqueue.ParallelizeUntil(ctx, 16, len(f.scoreWithNormalizePlugins), func(index int) { + pl := f.scoreWithNormalizePlugins[index] + nodeScoreList, ok := scores[pl.Name()] + if !ok { + err := fmt.Errorf("normalize score plugin %v has no corresponding scores in the PluginToNodeScoreMap", pl.Name()) + errCh.SendErrorWithCancel(err, cancel) + } + status := pl.NormalizeScore(pc, pod, nodeScoreList) + if !status.IsSuccess() { + err := fmt.Errorf("normalize score plugin %v failed with error %v", pl.Name(), status.Message()) + errCh.SendErrorWithCancel(err, cancel) + } + }) + + if err := errCh.ReceiveError(); err != nil { + msg := fmt.Sprintf("error while running normalize score plugin for pod %v: %v", pod.Name, err) + klog.Error(msg) + return NewStatus(Error, msg) + } + + return nil +} + +// ApplyScoreWeights applies weights to the score results. It should be called after +// RunNormalizeScorePlugins. +func (f *framework) ApplyScoreWeights(pc *PluginContext, pod *v1.Pod, scores PluginToNodeScoreMap) *Status { + ctx, cancel := context.WithCancel(context.Background()) + errCh := schedutil.NewErrorChannel() + workqueue.ParallelizeUntil(ctx, 16, len(f.scorePlugins), func(index int) { + pl := f.scorePlugins[index] + // Score plugins' weight has been checked when they are initialized. + weight := f.pluginNameToWeightMap[pl.Name()] + nodeScoreList, ok := scores[pl.Name()] + if !ok { + err := fmt.Errorf("score plugin %v has no corresponding scores in the PluginToNodeScoreMap", pl.Name()) + errCh.SendErrorWithCancel(err, cancel) + } + for i := range nodeScoreList { + nodeScoreList[i] = nodeScoreList[i] * weight + } + }) + + if err := errCh.ReceiveError(); err != nil { + msg := fmt.Sprintf("error while applying score weights for pod %v: %v", pod.Name, err) + klog.Error(msg) + return NewStatus(Error, msg) + } + + return nil +} + // RunPrebindPlugins runs the set of configured prebind plugins. It returns a // failure (bool) if any of the plugins returns an error. It also returns an // error containing the rejection message or the error occurred in the plugin. @@ -520,7 +585,6 @@ func pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin { find(plugins.Filter) find(plugins.PostFilter) find(plugins.Score) - find(plugins.NormalizeScore) find(plugins.Reserve) find(plugins.Permit) find(plugins.PreBind) diff --git a/pkg/scheduler/framework/v1alpha1/framework_test.go b/pkg/scheduler/framework/v1alpha1/framework_test.go new file mode 100644 index 00000000000..052f37b093b --- /dev/null +++ b/pkg/scheduler/framework/v1alpha1/framework_test.go @@ -0,0 +1,434 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "reflect" + "testing" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/scheduler/apis/config" +) + +const ( + scorePlugin1 = "score-plugin-1" + scorePlugin2 = "score-plugin-2" + scorePlugin3 = "score-plugin-3" + pluginNotImplementingScore = "plugin-not-implementing-score" + weight1 = 2 + weight2 = 3 + weight3 = 4 +) + +// TestScorePlugin1 and 2 implements ScoreWithNormalizePlugin interface, +// TestScorePlugin3 only implements ScorePlugin interface. +var _ = ScoreWithNormalizePlugin(&TestScorePlugin1{}) +var _ = ScoreWithNormalizePlugin(&TestScorePlugin2{}) +var _ = ScorePlugin(&TestScorePlugin3{}) + +type TestScorePlugin1 struct { + // If fail is true, NormalizeScore will return error status. + fail bool +} + +// NewScorePlugin1 is the factory for NormalizeScore plugin 1. +func NewScorePlugin1(_ *runtime.Unknown, _ FrameworkHandle) (Plugin, error) { + return &TestScorePlugin1{}, nil +} + +// NewScorePlugin1InjectFailure creates a new TestScorePlugin1 which will +// return an error status for NormalizeScore. +func NewScorePlugin1InjectFailure(_ *runtime.Unknown, _ FrameworkHandle) (Plugin, error) { + return &TestScorePlugin1{fail: true}, nil +} + +func (pl *TestScorePlugin1) Name() string { + return scorePlugin1 +} + +func (pl *TestScorePlugin1) NormalizeScore(pc *PluginContext, pod *v1.Pod, scores NodeScoreList) *Status { + if pl.fail { + return NewStatus(Error, "injecting failure.") + } + // Simply decrease each node score by 1. + for i := range scores { + scores[i] = scores[i] - 1 + } + return nil +} + +func (pl *TestScorePlugin1) Score(pc *PluginContext, p *v1.Pod, nodeName string) (int, *Status) { + // Score is currently not used in the tests so just return some dummy value. + return 0, nil +} + +type TestScorePlugin2 struct{} + +// NewScorePlugin2 is the factory for NormalizeScore plugin 2. +func NewScorePlugin2(_ *runtime.Unknown, _ FrameworkHandle) (Plugin, error) { + return &TestScorePlugin2{}, nil +} + +func (pl *TestScorePlugin2) Name() string { + return scorePlugin2 +} + +func (pl *TestScorePlugin2) NormalizeScore(pc *PluginContext, pod *v1.Pod, scores NodeScoreList) *Status { + // Simply force each node score to 5. + for i := range scores { + scores[i] = 5 + } + return nil +} + +func (pl *TestScorePlugin2) Score(pc *PluginContext, p *v1.Pod, nodeName string) (int, *Status) { + // Score is currently not used in the tests so just return some dummy value. + return 0, nil +} + +// TestScorePlugin3 only implements ScorePlugin interface. +type TestScorePlugin3 struct{} + +// NewScorePlugin3 is the factory for Score plugin 3. +func NewScorePlugin3(_ *runtime.Unknown, _ FrameworkHandle) (Plugin, error) { + return &TestScorePlugin3{}, nil +} + +func (pl *TestScorePlugin3) Name() string { + return scorePlugin3 +} + +func (pl *TestScorePlugin3) Score(pc *PluginContext, p *v1.Pod, nodeName string) (int, *Status) { + // Score is currently not used in the tests so just return some dummy value. + return 0, nil +} + +// PluginNotImplementingScore doesn't implement the ScorePlugin interface. +type PluginNotImplementingScore struct{} + +// NewPluginNotImplementingScore is the factory for PluginNotImplementingScore. +func NewPluginNotImplementingScore(_ *runtime.Unknown, _ FrameworkHandle) (Plugin, error) { + return &PluginNotImplementingScore{}, nil +} + +func (pl *PluginNotImplementingScore) Name() string { + return pluginNotImplementingScore +} + +var registry = Registry{ + scorePlugin1: NewScorePlugin1, + scorePlugin2: NewScorePlugin2, + scorePlugin3: NewScorePlugin3, + pluginNotImplementingScore: NewPluginNotImplementingScore, +} + +var plugin1 = &config.Plugins{ + Score: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: scorePlugin1, Weight: weight1}, + }, + }, +} +var plugin3 = &config.Plugins{ + Score: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: scorePlugin3, Weight: weight3}, + }, + }, +} +var plugin1And2 = &config.Plugins{ + Score: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: scorePlugin1, Weight: weight1}, + {Name: scorePlugin2, Weight: weight2}, + }, + }, +} +var plugin1And3 = &config.Plugins{ + Score: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: scorePlugin1, Weight: weight1}, + {Name: scorePlugin3, Weight: weight3}, + }, + }, +} + +// No specific config required. +var args = []config.PluginConfig{} +var pc = &PluginContext{} + +// Pod is only used for logging errors. +var pod = &v1.Pod{} + +func TestInitFrameworkWithScorePlugins(t *testing.T) { + tests := []struct { + name string + plugins *config.Plugins + // If initErr is true, we expect framework initialization to fail. + initErr bool + }{ + { + name: "enabled Score plugin doesn't exist in registry", + plugins: &config.Plugins{ + Score: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "notExist"}, + }, + }, + }, + initErr: true, + }, + { + name: "enabled Score plugin doesn't extend the ScorePlugin interface", + plugins: &config.Plugins{ + Score: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: pluginNotImplementingScore}, + }, + }, + }, + initErr: true, + }, + { + name: "Score plugins are nil", + plugins: &config.Plugins{Score: nil}, + }, + { + name: "enabled Score plugin list is empty", + plugins: &config.Plugins{ + Score: &config.PluginSet{ + Enabled: []config.Plugin{}, + }, + }, + }, + { + name: "enabled plugin only implements ScorePlugin interface", + plugins: &config.Plugins{ + Score: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: scorePlugin3}, + }, + }, + }, + }, + { + name: "enabled plugin implements ScoreWithNormalizePlugin interface", + plugins: &config.Plugins{ + Score: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: scorePlugin1}, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := NewFramework(registry, tt.plugins, args) + if tt.initErr && err == nil { + t.Fatal("Framework initialization should fail") + } + if !tt.initErr && err != nil { + t.Fatalf("Failed to create framework for testing: %v", err) + } + }) + } +} + +func TestRunNormalizeScorePlugins(t *testing.T) { + tests := []struct { + name string + registry Registry + plugins *config.Plugins + input PluginToNodeScoreMap + want PluginToNodeScoreMap + // If err is true, we expect RunNormalizeScorePlugin to fail. + err bool + }{ + { + name: "no NormalizeScore plugins", + plugins: plugin3, + registry: registry, + input: PluginToNodeScoreMap{ + scorePlugin1: {2, 3}, + }, + // No NormalizeScore plugin, map should be untouched. + want: PluginToNodeScoreMap{ + scorePlugin1: {2, 3}, + }, + }, + { + name: "single Score plugin, single NormalizeScore plugin", + registry: registry, + plugins: plugin1, + input: PluginToNodeScoreMap{ + scorePlugin1: {2, 3}, + }, + want: PluginToNodeScoreMap{ + // For plugin1, want=input-1. + scorePlugin1: {1, 2}, + }, + }, + { + name: "2 Score plugins, 2 NormalizeScore plugins", + registry: registry, + plugins: plugin1And2, + input: PluginToNodeScoreMap{ + scorePlugin1: {2, 3}, + scorePlugin2: {2, 4}, + }, + want: PluginToNodeScoreMap{ + // For plugin1, want=input-1. + scorePlugin1: {1, 2}, + // For plugin2, want=5. + scorePlugin2: {5, 5}, + }, + }, + { + name: "2 Score plugins, 1 NormalizeScore plugin", + registry: registry, + plugins: plugin1And3, + input: PluginToNodeScoreMap{ + scorePlugin1: {2, 3}, + scorePlugin3: {2, 4}, + }, + want: PluginToNodeScoreMap{ + // For plugin1, want=input-1. + scorePlugin1: {1, 2}, + // No NormalizeScore for plugin 3. The node scores are untouched. + scorePlugin3: {2, 4}, + }, + }, + { + name: "score map contains both test plugin 1 and 2 but plugin 1 fails", + registry: Registry{ + scorePlugin1: NewScorePlugin1InjectFailure, + scorePlugin2: NewScorePlugin2, + }, + plugins: plugin1And2, + input: PluginToNodeScoreMap{ + scorePlugin1: {2, 3}, + scorePlugin2: {2, 4}, + }, + err: true, + }, + { + name: "2 plugins but score map only contains plugin1", + registry: registry, + plugins: plugin1And2, + input: PluginToNodeScoreMap{ + scorePlugin1: {2, 3}, + }, + err: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + f, err := NewFramework(tt.registry, tt.plugins, args) + if err != nil { + t.Fatalf("Failed to create framework for testing: %v", err) + } + + status := f.RunNormalizeScorePlugins(pc, pod, tt.input) + + if tt.err { + if status.IsSuccess() { + t.Errorf("Expected status to be non-success.") + } + } else { + if !status.IsSuccess() { + t.Errorf("Expected status to be success.") + } + if !reflect.DeepEqual(tt.input, tt.want) { + t.Errorf("Score map after RunNormalizeScorePlugin: %+v, want: %+v.", tt.input, tt.want) + } + } + }) + } +} + +func TestApplyScoreWeights(t *testing.T) { + tests := []struct { + name string + plugins *config.Plugins + input PluginToNodeScoreMap + want PluginToNodeScoreMap + // If err is true, we expect ApplyScoreWeights to fail. + err bool + }{ + { + name: "single Score plugin, single nodeScoreList", + plugins: plugin1, + input: PluginToNodeScoreMap{ + scorePlugin1: {2, 3}, + }, + want: PluginToNodeScoreMap{ + // For plugin1, want=input*weight1. + scorePlugin1: {4, 6}, + }, + }, + { + name: "2 Score plugins, 2 nodeScoreLists in scoreMap", + plugins: plugin1And2, + input: PluginToNodeScoreMap{ + scorePlugin1: {2, 3}, + scorePlugin2: {2, 4}, + }, + want: PluginToNodeScoreMap{ + // For plugin1, want=input*weight1. + scorePlugin1: {4, 6}, + // For plugin2, want=input*weight2. + scorePlugin2: {6, 12}, + }, + }, + { + name: "2 Score plugins, 1 without corresponding nodeScoreList in the score map", + plugins: plugin1And2, + input: PluginToNodeScoreMap{ + scorePlugin1: {2, 3}, + }, + err: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + f, err := NewFramework(registry, tt.plugins, args) + if err != nil { + t.Fatalf("Failed to create framework for testing: %v", err) + } + + status := f.ApplyScoreWeights(pc, pod, tt.input) + + if tt.err { + if status.IsSuccess() { + t.Errorf("Expected status to be non-success.") + } + } else { + if !status.IsSuccess() { + t.Errorf("Expected status to be success.") + } + if !reflect.DeepEqual(tt.input, tt.want) { + t.Errorf("Score map after RunNormalizeScorePlugin: %+v, want: %+v.", tt.input, tt.want) + } + } + }) + } +} diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index ff87eca3c8b..2a8a0bea524 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -170,6 +170,17 @@ type ScorePlugin interface { Score(pc *PluginContext, p *v1.Pod, nodeName string) (int, *Status) } +// ScoreWithNormalizePlugin is an interface that must be implemented by "score" +// plugins that also need to normalize the node scoring results produced by the same +// plugin's "Score" method. +type ScoreWithNormalizePlugin interface { + ScorePlugin + // NormalizeScore is called for all node scores produced by the same plugin's "Score" + // method. A successful run of NormalizeScore will update the scores list and return + // a success status. + NormalizeScore(pc *PluginContext, p *v1.Pod, scores NodeScoreList) *Status +} + // ReservePlugin is an interface for Reserve plugins. These plugins are called // at the reservation point. These are meant to update the state of the plugin. // This concept used to be called 'assume' in the original scheduler. @@ -264,6 +275,16 @@ type Framework interface { // a non-success status. RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.Node) (PluginToNodeScoreMap, *Status) + // RunNormalizeScorePlugins runs the normalize score plugins. It should be called after + // RunScorePlugins with the PluginToNodeScoreMap result. It then modifies the map with + // normalized scores. It returns a non-success Status if any of the normalize score plugins + // returns a non-success status. + RunNormalizeScorePlugins(pc *PluginContext, pod *v1.Pod, scores PluginToNodeScoreMap) *Status + + // ApplyScoreWeights applies weights to the score results. It should be called after + // RunNormalizeScorePlugins. + ApplyScoreWeights(pc *PluginContext, pod *v1.Pod, scores PluginToNodeScoreMap) *Status + // RunPrebindPlugins runs the set of configured prebind plugins. It returns // *Status and its code is set to non-success if any of the plugins returns // anything but Success. If the Status code is "Unschedulable", it is diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 6740265e7b2..0b48b3a5211 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -179,6 +179,14 @@ func (*fakeFramework) RunScorePlugins(pc *framework.PluginContext, pod *v1.Pod, return nil, nil } +func (*fakeFramework) RunNormalizeScorePlugins(pc *framework.PluginContext, pod *v1.Pod, scores framework.PluginToNodeScoreMap) *framework.Status { + return nil +} + +func (*fakeFramework) ApplyScoreWeights(pc *framework.PluginContext, pod *v1.Pod, scores framework.PluginToNodeScoreMap) *framework.Status { + return nil +} + func (*fakeFramework) RunPrebindPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status { return nil } diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index 7f849bcc02e..befc4672d31 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -38,9 +38,14 @@ type PrefilterPlugin struct { } type ScorePlugin struct { - failScore bool - numCalled int - highScoreNode string + failScore bool + numScoreCalled int + highScoreNode string +} + +type ScoreWithNormalizePlugin struct { + numScoreCalled int + numNormalizeScoreCalled int } type FilterPlugin struct { @@ -91,19 +96,22 @@ type PermitPlugin struct { } const ( - prefilterPluginName = "prefilter-plugin" - scorePluginName = "score-plugin" - filterPluginName = "filter-plugin" - reservePluginName = "reserve-plugin" - prebindPluginName = "prebind-plugin" - unreservePluginName = "unreserve-plugin" - postbindPluginName = "postbind-plugin" - permitPluginName = "permit-plugin" + prefilterPluginName = "prefilter-plugin" + scorePluginName = "score-plugin" + scoreWithNormalizePluginName = "score-with-normalize-plugin" + filterPluginName = "filter-plugin" + reservePluginName = "reserve-plugin" + prebindPluginName = "prebind-plugin" + unreservePluginName = "unreserve-plugin" + postbindPluginName = "postbind-plugin" + permitPluginName = "permit-plugin" ) var _ = framework.PrefilterPlugin(&PrefilterPlugin{}) var _ = framework.ScorePlugin(&ScorePlugin{}) var _ = framework.FilterPlugin(&FilterPlugin{}) +var _ = framework.ScorePlugin(&ScorePlugin{}) +var _ = framework.ScoreWithNormalizePlugin(&ScoreWithNormalizePlugin{}) var _ = framework.ReservePlugin(&ReservePlugin{}) var _ = framework.PrebindPlugin(&PrebindPlugin{}) var _ = framework.BindPlugin(&BindPlugin{}) @@ -111,6 +119,13 @@ var _ = framework.PostbindPlugin(&PostbindPlugin{}) var _ = framework.UnreservePlugin(&UnreservePlugin{}) var _ = framework.PermitPlugin(&PermitPlugin{}) +var scPlugin = &ScorePlugin{} + +// NewScorePlugin is the factory for score plugin. +func NewScorePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { + return scPlugin, nil +} + // Name returns name of the score plugin. func (sp *ScorePlugin) Name() string { return scorePluginName @@ -119,21 +134,19 @@ func (sp *ScorePlugin) Name() string { // reset returns name of the score plugin. func (sp *ScorePlugin) reset() { sp.failScore = false - sp.numCalled = 0 + sp.numScoreCalled = 0 sp.highScoreNode = "" } -var scPlugin = &ScorePlugin{} - // Score returns the score of scheduling a pod on a specific node. func (sp *ScorePlugin) Score(pc *framework.PluginContext, p *v1.Pod, nodeName string) (int, *framework.Status) { - sp.numCalled++ + sp.numScoreCalled++ if sp.failScore { return 0, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", p.Name)) } score := 10 - if sp.numCalled == 1 { + if sp.numScoreCalled == 1 { // The first node is scored the highest, the rest is scored lower. sp.highScoreNode = nodeName score = 100 @@ -141,9 +154,34 @@ func (sp *ScorePlugin) Score(pc *framework.PluginContext, p *v1.Pod, nodeName st return score, nil } -// NewScorePlugin is the factory for score plugin. -func NewScorePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { - return scPlugin, nil +var scoreWithNormalizePlguin = &ScoreWithNormalizePlugin{} + +// Name returns name of the score plugin. +func (sp *ScoreWithNormalizePlugin) Name() string { + return scoreWithNormalizePluginName +} + +// reset returns name of the score plugin. +func (sp *ScoreWithNormalizePlugin) reset() { + sp.numScoreCalled = 0 + sp.numNormalizeScoreCalled = 0 +} + +// Score returns the score of scheduling a pod on a specific node. +func (sp *ScoreWithNormalizePlugin) Score(pc *framework.PluginContext, p *v1.Pod, nodeName string) (int, *framework.Status) { + sp.numScoreCalled++ + score := 10 + return score, nil +} + +func (sp *ScoreWithNormalizePlugin) NormalizeScore(pc *framework.PluginContext, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status { + sp.numNormalizeScoreCalled++ + return nil +} + +// NewScoreWithNormalizePlugin is the factory for score with normalize plugin. +func NewScoreWithNormalizePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { + return scoreWithNormalizePlguin, nil } var filterPlugin = &FilterPlugin{} @@ -490,9 +528,6 @@ func TestPrefilterPlugin(t *testing.T) { // TestScorePlugin tests invocation of score plugins. func TestScorePlugin(t *testing.T) { - // Create a plugin registry for testing. Register only a score plugin. - registry := framework.Registry{scorePluginName: NewScorePlugin} - // Setup initial score plugin for testing. plugins := &schedulerconfig.Plugins{ Score: &schedulerconfig.PluginSet{ @@ -503,22 +538,9 @@ func TestScorePlugin(t *testing.T) { }, }, } - // Set empty plugin config for testing - emptyPluginConfig := []schedulerconfig.PluginConfig{} - - // Create the master and the scheduler with the test plugin set. - context := initTestSchedulerWithOptions(t, - initTestMaster(t, "score-plugin", nil), - false, nil, registry, plugins, emptyPluginConfig, false, time.Second) + context, cs := initTestContextForScorePlugin(t, plugins) defer cleanupTest(t, context) - cs := context.clientSet - // Add multiple nodes, one of them will be scored much higher than the others. - _, err := createNodes(cs, "test-node", nil, 10) - if err != nil { - t.Fatalf("Cannot create nodes: %v", err) - } - for i, fail := range []bool{false, true} { scPlugin.failScore = fail // Create a best effort pod. @@ -545,8 +567,8 @@ func TestScorePlugin(t *testing.T) { } } - if scPlugin.numCalled == 0 { - t.Errorf("Expected the reserve plugin to be called.") + if scPlugin.numScoreCalled == 0 { + t.Errorf("Expected the score plugin to be called.") } scPlugin.reset() @@ -554,6 +576,42 @@ func TestScorePlugin(t *testing.T) { } } +// TestNormalizeScorePlugin tests invocation of normalize score plugins. +func TestNormalizeScorePlugin(t *testing.T) { + // Setup initial score plugin for testing. + plugins := &schedulerconfig.Plugins{ + Score: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: scoreWithNormalizePluginName, + }, + }, + }, + } + context, cs := initTestContextForScorePlugin(t, plugins) + defer cleanupTest(t, context) + + // Create a best effort pod. + pod, err := createPausePod(cs, + initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name})) + if err != nil { + t.Fatalf("Error while creating a test pod: %v", err) + } + + if err = waitForPodToSchedule(cs, pod); err != nil { + t.Errorf("Expected the pod to be scheduled. error: %v", err) + } + + if scoreWithNormalizePlguin.numScoreCalled == 0 { + t.Errorf("Expected the score plugin to be called.") + } + if scoreWithNormalizePlguin.numNormalizeScoreCalled == 0 { + t.Error("Expected the normalize score plugin to be called") + } + + scoreWithNormalizePlguin.reset() +} + // TestReservePlugin tests invocation of reserve plugins. func TestReservePlugin(t *testing.T) { // Create a plugin registry for testing. Register only a reserve plugin. @@ -1477,3 +1535,26 @@ func TestPreemptWithPermitPlugin(t *testing.T) { perPlugin.reset() cleanupPods(cs, t, []*v1.Pod{waitingPod, preemptorPod}) } + +func initTestContextForScorePlugin(t *testing.T, plugins *schedulerconfig.Plugins) (*testContext, *clientset.Clientset) { + // Create a plugin registry for testing. Register only a score plugin. + registry := framework.Registry{ + scorePluginName: NewScorePlugin, + scoreWithNormalizePluginName: NewScoreWithNormalizePlugin, + } + + // Set empty plugin config for testing + emptyPluginConfig := []schedulerconfig.PluginConfig{} + + // Create the master and the scheduler with the test plugin set. + context := initTestSchedulerWithOptions(t, + initTestMaster(t, "score-plugin", nil), + false, nil, registry, plugins, emptyPluginConfig, false, time.Second) + + cs := context.clientSet + _, err := createNodes(cs, "test-node", nil, 10) + if err != nil { + t.Fatalf("Cannot create nodes: %v", err) + } + return context, cs +}