From 5e84257133a963d46cc8d73728c1e82431f07796 Mon Sep 17 00:00:00 2001 From: draveness Date: Fri, 27 Sep 2019 09:15:32 +0800 Subject: [PATCH] feat(scheduler): use reflect to reduce the similar pattern --- pkg/scheduler/framework/v1alpha1/BUILD | 1 + pkg/scheduler/framework/v1alpha1/framework.go | 236 ++++++------------ .../framework/v1alpha1/framework_test.go | 46 ++++ 3 files changed, 128 insertions(+), 155 deletions(-) diff --git a/pkg/scheduler/framework/v1alpha1/BUILD b/pkg/scheduler/framework/v1alpha1/BUILD index 71f141e6141..b97f0693584 100644 --- a/pkg/scheduler/framework/v1alpha1/BUILD +++ b/pkg/scheduler/framework/v1alpha1/BUILD @@ -20,6 +20,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/sigs.k8s.io/yaml:go_default_library", diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index e7646483c16..7a521e3c2fa 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -19,11 +19,13 @@ package v1alpha1 import ( "context" "fmt" + "reflect" "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" "k8s.io/klog" "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -101,169 +103,93 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi } } - if plugins.PreFilter != nil { - for _, pf := range plugins.PreFilter.Enabled { - if pg, ok := pluginsMap[pf.Name]; ok { - p, ok := pg.(PreFilterPlugin) - if !ok { - return nil, fmt.Errorf("plugin %q does not extend prefilter plugin", pf.Name) - } - f.preFilterPlugins = append(f.preFilterPlugins, p) - } else { - return nil, fmt.Errorf("prefilter plugin %q does not exist", pf.Name) - } + if err := updatePluginList(reflect.ValueOf(&f.preFilterPlugins), plugins.PreFilter, reflect.TypeOf((*PreFilterPlugin)(nil)), pluginsMap); err != nil { + return nil, err + } + + if err := updatePluginList(reflect.ValueOf(&f.filterPlugins), plugins.Filter, reflect.TypeOf((*FilterPlugin)(nil)), pluginsMap); err != nil { + return nil, err + } + + if err := updatePluginList(reflect.ValueOf(&f.reservePlugins), plugins.Reserve, reflect.TypeOf((*ReservePlugin)(nil)), pluginsMap); err != nil { + return nil, err + } + + if err := updatePluginList(reflect.ValueOf(&f.postFilterPlugins), plugins.PostFilter, reflect.TypeOf((*PostFilterPlugin)(nil)), pluginsMap); err != nil { + return nil, err + } + + if err := updatePluginList(reflect.ValueOf(&f.scorePlugins), plugins.Score, reflect.TypeOf((*ScorePlugin)(nil)), pluginsMap); err != nil { + return nil, err + } + + if err := updatePluginList(reflect.ValueOf(&f.preBindPlugins), plugins.PreBind, reflect.TypeOf((*PreBindPlugin)(nil)), pluginsMap); err != nil { + return nil, err + } + + if err := updatePluginList(reflect.ValueOf(&f.bindPlugins), plugins.Bind, reflect.TypeOf((*BindPlugin)(nil)), pluginsMap); err != nil { + return nil, err + } + + if err := updatePluginList(reflect.ValueOf(&f.postBindPlugins), plugins.PostBind, reflect.TypeOf((*PostBindPlugin)(nil)), pluginsMap); err != nil { + return nil, err + } + + if err := updatePluginList(reflect.ValueOf(&f.unreservePlugins), plugins.Unreserve, reflect.TypeOf((*UnreservePlugin)(nil)), pluginsMap); err != nil { + return nil, err + } + + if err := updatePluginList(reflect.ValueOf(&f.permitPlugins), plugins.Permit, reflect.TypeOf((*PermitPlugin)(nil)), pluginsMap); err != nil { + return nil, err + } + + if err := updatePluginList(reflect.ValueOf(&f.queueSortPlugins), plugins.QueueSort, reflect.TypeOf((*QueueSortPlugin)(nil)), pluginsMap); err != nil { + return nil, err + } + + for _, scorePlugin := range f.scorePlugins { + if f.pluginNameToWeightMap[scorePlugin.Name()] == 0 { + return nil, fmt.Errorf("score plugin %q is not configured with weight", scorePlugin.Name()) } } - if plugins.Filter != nil { - for _, r := range plugins.Filter.Enabled { - if pg, ok := pluginsMap[r.Name]; ok { - p, ok := pg.(FilterPlugin) - if !ok { - return nil, fmt.Errorf("plugin %q does not extend filter plugin", r.Name) - } - f.filterPlugins = append(f.filterPlugins, p) - } else { - return nil, fmt.Errorf("filter plugin %q does not exist", r.Name) - } - } - } - - if plugins.Score != nil { - for _, sc := range plugins.Score.Enabled { - if pg, ok := pluginsMap[sc.Name]; ok { - p, ok := pg.(ScorePlugin) - if !ok { - return nil, fmt.Errorf("plugin %q does not extend score plugin", sc.Name) - } - if f.pluginNameToWeightMap[p.Name()] == 0 { - return nil, fmt.Errorf("score plugin %q is not configured with weight", p.Name()) - } - f.scorePlugins = append(f.scorePlugins, p) - } else { - return nil, fmt.Errorf("score plugin %q does not exist", sc.Name) - } - } - } - - if plugins.Reserve != nil { - for _, r := range plugins.Reserve.Enabled { - if pg, ok := pluginsMap[r.Name]; ok { - p, ok := pg.(ReservePlugin) - if !ok { - return nil, fmt.Errorf("plugin %q does not extend reserve plugin", r.Name) - } - f.reservePlugins = append(f.reservePlugins, p) - } else { - return nil, fmt.Errorf("reserve plugin %q does not exist", r.Name) - } - } - } - - if plugins.PostFilter != nil { - for _, r := range plugins.PostFilter.Enabled { - if pg, ok := pluginsMap[r.Name]; ok { - p, ok := pg.(PostFilterPlugin) - if !ok { - return nil, fmt.Errorf("plugin %q does not extend post-filter plugin", r.Name) - } - f.postFilterPlugins = append(f.postFilterPlugins, p) - } else { - return nil, fmt.Errorf("post-filter plugin %q does not exist", r.Name) - } - } - } - - if plugins.PreBind != nil { - for _, pb := range plugins.PreBind.Enabled { - if pg, ok := pluginsMap[pb.Name]; ok { - p, ok := pg.(PreBindPlugin) - if !ok { - return nil, fmt.Errorf("plugin %q does not extend prebind plugin", pb.Name) - } - f.preBindPlugins = append(f.preBindPlugins, p) - } else { - return nil, fmt.Errorf("prebind plugin %q does not exist", pb.Name) - } - } - } - - if plugins.Bind != nil { - for _, pb := range plugins.Bind.Enabled { - if pg, ok := pluginsMap[pb.Name]; ok { - p, ok := pg.(BindPlugin) - if !ok { - return nil, fmt.Errorf("plugin %q does not extend bind plugin", pb.Name) - } - f.bindPlugins = append(f.bindPlugins, p) - } else { - return nil, fmt.Errorf("bind plugin %q does not exist", pb.Name) - } - } - } - - if plugins.PostBind != nil { - for _, pb := range plugins.PostBind.Enabled { - if pg, ok := pluginsMap[pb.Name]; ok { - p, ok := pg.(PostBindPlugin) - if !ok { - return nil, fmt.Errorf("plugin %q does not extend postbind plugin", pb.Name) - } - f.postBindPlugins = append(f.postBindPlugins, p) - } else { - return nil, fmt.Errorf("postbind plugin %q does not exist", pb.Name) - } - } - } - - if plugins.Unreserve != nil { - for _, ur := range plugins.Unreserve.Enabled { - if pg, ok := pluginsMap[ur.Name]; ok { - p, ok := pg.(UnreservePlugin) - if !ok { - return nil, fmt.Errorf("plugin %q does not extend unreserve plugin", ur.Name) - } - f.unreservePlugins = append(f.unreservePlugins, p) - } else { - return nil, fmt.Errorf("unreserve plugin %q does not exist", ur.Name) - } - } - } - - if plugins.Permit != nil { - for _, pr := range plugins.Permit.Enabled { - if pg, ok := pluginsMap[pr.Name]; ok { - p, ok := pg.(PermitPlugin) - if !ok { - return nil, fmt.Errorf("plugin %q does not extend permit plugin", pr.Name) - } - f.permitPlugins = append(f.permitPlugins, p) - } else { - return nil, fmt.Errorf("permit plugin %q does not exist", pr.Name) - } - } - } - - if plugins.QueueSort != nil { - for _, qs := range plugins.QueueSort.Enabled { - if pg, ok := pluginsMap[qs.Name]; ok { - p, ok := pg.(QueueSortPlugin) - if !ok { - return nil, fmt.Errorf("plugin %q does not extend queue sort plugin", qs.Name) - } - f.queueSortPlugins = append(f.queueSortPlugins, p) - if len(f.queueSortPlugins) > 1 { - return nil, fmt.Errorf("only one queue sort plugin can be enabled") - } - } else { - return nil, fmt.Errorf("queue sort plugin %q does not exist", qs.Name) - } - } + if len(f.queueSortPlugins) > 1 { + return nil, fmt.Errorf("only one queue sort plugin can be enabled") } return f, nil } +func updatePluginList(pluginList reflect.Value, pluginSet *config.PluginSet, pluginType reflect.Type, pluginsMap map[string]Plugin) error { + if pluginSet == nil { + return nil + } + + plugins := pluginList.Elem() + pluginType = pluginType.Elem() + set := sets.NewString() + for _, ep := range pluginSet.Enabled { + pg, ok := pluginsMap[ep.Name] + if !ok { + return fmt.Errorf("%s %q does not exist", pluginType.String(), ep.Name) + } + + if !reflect.TypeOf(pg).Implements(pluginType) { + return fmt.Errorf("plugin %q does not extend %s plugin", ep.Name, pluginType.String()) + } + + if set.Has(ep.Name) { + return fmt.Errorf("plugin %q already registered as %q", ep.Name, pluginType.String()) + } + + set.Insert(ep.Name) + + newPlugins := reflect.Append(plugins, reflect.ValueOf(pg)) + plugins.Set(newPlugins) + } + return nil +} + // QueueSortFunc returns the function to sort pods in scheduling queue func (f *framework) QueueSortFunc() LessFunc { if len(f.queueSortPlugins) == 0 { diff --git a/pkg/scheduler/framework/v1alpha1/framework_test.go b/pkg/scheduler/framework/v1alpha1/framework_test.go index c6aa4d95e6c..af80c846d8b 100644 --- a/pkg/scheduler/framework/v1alpha1/framework_test.go +++ b/pkg/scheduler/framework/v1alpha1/framework_test.go @@ -19,6 +19,7 @@ package v1alpha1 import ( "fmt" "reflect" + "strings" "testing" v1 "k8s.io/api/core/v1" @@ -35,6 +36,7 @@ const ( pluginNotImplementingScore = "plugin-not-implementing-score" preFilterPluginName = "prefilter-plugin" preFilterWithExtensionsPluginName = "prefilter-with-extensions-plugin" + duplicatePluginName = "duplicate-plugin" ) // TestScoreWithNormalizePlugin implements ScoreWithNormalizePlugin interface. @@ -166,12 +168,34 @@ func (pl *TestPreFilterWithExtensionsPlugin) Extensions() PreFilterExtensions { return pl } +type TestDuplicatePlugin struct { +} + +func (dp *TestDuplicatePlugin) Name() string { + return duplicatePluginName +} + +func (dp *TestDuplicatePlugin) PreFilter(pc *PluginContext, p *v1.Pod) *Status { + return nil +} + +func (dp *TestDuplicatePlugin) Extensions() PreFilterExtensions { + return nil +} + +var _ PreFilterPlugin = &TestDuplicatePlugin{} + +func newDuplicatePlugin(_ *runtime.Unknown, _ FrameworkHandle) (Plugin, error) { + return &TestDuplicatePlugin{}, nil +} + var registry Registry = func() Registry { r := make(Registry) r.Register(scoreWithNormalizePlugin1, newScoreWithNormalizePlugin1) r.Register(scoreWithNormalizePlugin2, newScoreWithNormalizePlugin2) r.Register(scorePlugin1, newScorePlugin1) r.Register(pluginNotImplementingScore, newPluginNotImplementingScore) + r.Register(duplicatePluginName, newDuplicatePlugin) return r }() @@ -239,6 +263,28 @@ func TestInitFrameworkWithScorePlugins(t *testing.T) { } } +func TestRegisterDuplicatePluginWouldFail(t *testing.T) { + plugin := config.Plugin{Name: duplicatePluginName, Weight: 1} + + pluginSet := config.PluginSet{ + Enabled: []config.Plugin{ + plugin, + plugin, + }, + } + plugins := config.Plugins{} + plugins.PreFilter = &pluginSet + + _, err := NewFramework(registry, &plugins, emptyArgs) + if err == nil { + t.Fatal("Framework initialization should fail") + } + + if err != nil && !strings.Contains(err.Error(), "already registered") { + t.Fatalf("Unexpected error, got %s, expect: plugin already registered", err.Error()) + } +} + func TestRunScorePlugins(t *testing.T) { tests := []struct { name string