Merge pull request #101460 from edwardstudy/ed/fix-score-weight

Introduce scorePluginWeightMap to replace pluginNameToWeightMap
This commit is contained in:
Kubernetes Prow Robot 2021-05-24 21:48:56 -07:00 committed by GitHub
commit 4509d6f79f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 92 additions and 44 deletions

View File

@ -75,21 +75,21 @@ var configDecoder = scheme.Codecs.UniversalDecoder()
// frameworkImpl is the component responsible for initializing and running scheduler
// plugins.
type frameworkImpl struct {
registry Registry
snapshotSharedLister framework.SharedLister
waitingPods *waitingPodsMap
pluginNameToWeightMap map[string]int
queueSortPlugins []framework.QueueSortPlugin
preFilterPlugins []framework.PreFilterPlugin
filterPlugins []framework.FilterPlugin
postFilterPlugins []framework.PostFilterPlugin
preScorePlugins []framework.PreScorePlugin
scorePlugins []framework.ScorePlugin
reservePlugins []framework.ReservePlugin
preBindPlugins []framework.PreBindPlugin
bindPlugins []framework.BindPlugin
postBindPlugins []framework.PostBindPlugin
permitPlugins []framework.PermitPlugin
registry Registry
snapshotSharedLister framework.SharedLister
waitingPods *waitingPodsMap
scorePluginWeight map[string]int
queueSortPlugins []framework.QueueSortPlugin
preFilterPlugins []framework.PreFilterPlugin
filterPlugins []framework.FilterPlugin
postFilterPlugins []framework.PostFilterPlugin
preScorePlugins []framework.PreScorePlugin
scorePlugins []framework.ScorePlugin
reservePlugins []framework.ReservePlugin
preBindPlugins []framework.PreBindPlugin
bindPlugins []framework.BindPlugin
postBindPlugins []framework.PostBindPlugin
permitPlugins []framework.PermitPlugin
clientSet clientset.Interface
kubeConfig *restclient.Config
@ -258,19 +258,19 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, opts ...Opti
}
f := &frameworkImpl{
registry: r,
snapshotSharedLister: options.snapshotSharedLister,
pluginNameToWeightMap: make(map[string]int),
waitingPods: newWaitingPodsMap(),
clientSet: options.clientSet,
kubeConfig: options.kubeConfig,
eventRecorder: options.eventRecorder,
informerFactory: options.informerFactory,
metricsRecorder: options.metricsRecorder,
runAllFilters: options.runAllFilters,
extenders: options.extenders,
PodNominator: options.podNominator,
parallelizer: options.parallelizer,
registry: r,
snapshotSharedLister: options.snapshotSharedLister,
scorePluginWeight: make(map[string]int),
waitingPods: newWaitingPodsMap(),
clientSet: options.clientSet,
kubeConfig: options.kubeConfig,
eventRecorder: options.eventRecorder,
informerFactory: options.informerFactory,
metricsRecorder: options.metricsRecorder,
runAllFilters: options.runAllFilters,
extenders: options.extenders,
PodNominator: options.podNominator,
parallelizer: options.parallelizer,
}
if profile == nil {
@ -282,6 +282,22 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, opts ...Opti
return f, nil
}
var totalPriority int64
for _, e := range profile.Plugins.Score.Enabled {
// a weight of zero is not permitted, plugins can be disabled explicitly
// when configured.
f.scorePluginWeight[e.Name] = int(e.Weight)
if f.scorePluginWeight[e.Name] == 0 {
f.scorePluginWeight[e.Name] = 1
}
// Checks totalPriority against MaxTotalScore to avoid overflow
if int64(f.scorePluginWeight[e.Name])*framework.MaxNodeScore > framework.MaxTotalScore-totalPriority {
return nil, fmt.Errorf("total score of Score plugins could overflow")
}
totalPriority += int64(f.scorePluginWeight[e.Name]) * framework.MaxNodeScore
}
// get needed plugins from config
pg := f.pluginsNeeded(profile.Plugins)
@ -300,7 +316,6 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, opts ...Opti
}
pluginsMap := make(map[string]framework.Plugin)
var totalPriority int64
for name, factory := range r {
// initialize only needed plugins.
if _, ok := pg[name]; !ok {
@ -325,18 +340,6 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, opts ...Opti
// Update ClusterEventMap in place.
fillEventToPluginMap(p, options.clusterEventMap)
// a weight of zero is not permitted, plugins can be disabled explicitly
// when configured.
f.pluginNameToWeightMap[name] = int(pg[name].Weight)
if f.pluginNameToWeightMap[name] == 0 {
f.pluginNameToWeightMap[name] = 1
}
// Checks totalPriority against MaxTotalScore to avoid overflow
if int64(f.pluginNameToWeightMap[name])*framework.MaxNodeScore > framework.MaxTotalScore-totalPriority {
return nil, fmt.Errorf("total score of Score plugins could overflow")
}
totalPriority += int64(f.pluginNameToWeightMap[name]) * framework.MaxNodeScore
}
for _, e := range f.getExtensionPoints(profile.Plugins) {
@ -348,7 +351,7 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, opts ...Opti
// Verifying the score weights again since Plugin.Name() could return a different
// value from the one used in the configuration.
for _, scorePlugin := range f.scorePlugins {
if f.pluginNameToWeightMap[scorePlugin.Name()] == 0 {
if f.scorePluginWeight[scorePlugin.Name()] == 0 {
return nil, fmt.Errorf("score plugin %q is not configured with weight", scorePlugin.Name())
}
}
@ -820,7 +823,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
f.Parallelizer().Until(ctx, len(f.scorePlugins), func(index int) {
pl := f.scorePlugins[index]
// Score plugins' weight has been checked when they are initialized.
weight := f.pluginNameToWeightMap[pl.Name()]
weight := f.scorePluginWeight[pl.Name()]
nodeScoreList := pluginToNodeScores[pl.Name()]
for i, nodeScore := range nodeScoreList {
@ -1140,7 +1143,7 @@ func (f *frameworkImpl) ListPlugins() map[string][]config.Plugin {
p := config.Plugin{Name: name}
if extName == "ScorePlugin" {
// Weights apply only to score plugins.
p.Weight = int32(f.pluginNameToWeightMap[name])
p.Weight = int32(f.scorePluginWeight[name])
}
cfgs = append(cfgs, p)
}

View File

@ -134,6 +134,10 @@ func (pl *PluginNotImplementingScore) Name() string {
return pluginNotImplementingScore
}
func newTestPlugin(injArgs runtime.Object, f framework.Handle) (framework.Plugin, error) {
return &TestPlugin{name: testPlugin}, nil
}
// TestPlugin implements all Plugin interfaces.
type TestPlugin struct {
name string
@ -329,6 +333,7 @@ var registry = func() Registry {
r.Register(scorePlugin1, newScorePlugin1)
r.Register(pluginNotImplementingScore, newPluginNotImplementingScore)
r.Register(duplicatePluginName, newDuplicatePlugin)
r.Register(testPlugin, newTestPlugin)
return r
}()
@ -2362,6 +2367,46 @@ func TestListPlugins(t *testing.T) {
}
}
func TestNewFrameworkPluginWeights(t *testing.T) {
tests := []struct {
name string
plugins *config.Plugins
}{
{
name: "Extend multiple extension points by same plugin",
plugins: &config.Plugins{
Score: config.PluginSet{Enabled: []config.Plugin{{Name: testPlugin, Weight: 3}}},
PostBind: config.PluginSet{Enabled: []config.Plugin{{Name: testPlugin, Weight: 6}}},
},
},
{
name: "Add multiple score plugins",
plugins: &config.Plugins{
Score: config.PluginSet{Enabled: []config.Plugin{{Name: testPlugin, Weight: 3}, {Name: scorePlugin1, Weight: 6}}},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
profile := config.KubeSchedulerProfile{Plugins: tt.plugins}
f, err := newFrameworkWithQueueSortAndBind(registry, profile)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
plugins := f.ListPlugins()
if len(plugins["ScorePlugin"]) != len(tt.plugins.Score.Enabled) {
t.Fatalf("Expect %d ScorePlugin, got %d from: %v", len(tt.plugins.Score.Enabled), len(plugins["ScorePlugin"]), plugins["ScorePlugin"])
}
if diff := cmp.Diff(tt.plugins.Score.Enabled, plugins["ScorePlugin"]); diff != "" {
t.Errorf("unexpected plugin weights (-want,+got):\n%s", diff)
}
})
}
}
func buildScoreConfigDefaultWeights(ps ...string) *config.Plugins {
return buildScoreConfigWithWeights(defaultWeights, ps...)
}