Merge pull request #83243 from draveness/feature/use-reflect-in-new-registry

feat(scheduler): use reflect to reduce the similar pattern
This commit is contained in:
Kubernetes Prow Robot 2019-10-03 14:39:54 -07:00 committed by GitHub
commit 3317805652
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 128 additions and 155 deletions

View File

@ -19,6 +19,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/sigs.k8s.io/yaml:go_default_library",

View File

@ -19,11 +19,13 @@ package v1alpha1
import (
"context"
"fmt"
"reflect"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -101,169 +103,93 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
}
}
if plugins.PreFilter != nil {
for _, pf := range plugins.PreFilter.Enabled {
if pg, ok := pluginsMap[pf.Name]; ok {
p, ok := pg.(PreFilterPlugin)
if !ok {
return nil, fmt.Errorf("plugin %q does not extend prefilter plugin", pf.Name)
}
f.preFilterPlugins = append(f.preFilterPlugins, p)
} else {
return nil, fmt.Errorf("prefilter plugin %q does not exist", pf.Name)
}
if err := updatePluginList(reflect.ValueOf(&f.preFilterPlugins), plugins.PreFilter, reflect.TypeOf((*PreFilterPlugin)(nil)), pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(reflect.ValueOf(&f.filterPlugins), plugins.Filter, reflect.TypeOf((*FilterPlugin)(nil)), pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(reflect.ValueOf(&f.reservePlugins), plugins.Reserve, reflect.TypeOf((*ReservePlugin)(nil)), pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(reflect.ValueOf(&f.postFilterPlugins), plugins.PostFilter, reflect.TypeOf((*PostFilterPlugin)(nil)), pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(reflect.ValueOf(&f.scorePlugins), plugins.Score, reflect.TypeOf((*ScorePlugin)(nil)), pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(reflect.ValueOf(&f.preBindPlugins), plugins.PreBind, reflect.TypeOf((*PreBindPlugin)(nil)), pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(reflect.ValueOf(&f.bindPlugins), plugins.Bind, reflect.TypeOf((*BindPlugin)(nil)), pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(reflect.ValueOf(&f.postBindPlugins), plugins.PostBind, reflect.TypeOf((*PostBindPlugin)(nil)), pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(reflect.ValueOf(&f.unreservePlugins), plugins.Unreserve, reflect.TypeOf((*UnreservePlugin)(nil)), pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(reflect.ValueOf(&f.permitPlugins), plugins.Permit, reflect.TypeOf((*PermitPlugin)(nil)), pluginsMap); err != nil {
return nil, err
}
if err := updatePluginList(reflect.ValueOf(&f.queueSortPlugins), plugins.QueueSort, reflect.TypeOf((*QueueSortPlugin)(nil)), pluginsMap); err != nil {
return nil, err
}
for _, scorePlugin := range f.scorePlugins {
if f.pluginNameToWeightMap[scorePlugin.Name()] == 0 {
return nil, fmt.Errorf("score plugin %q is not configured with weight", scorePlugin.Name())
}
}
if plugins.Filter != nil {
for _, r := range plugins.Filter.Enabled {
if pg, ok := pluginsMap[r.Name]; ok {
p, ok := pg.(FilterPlugin)
if !ok {
return nil, fmt.Errorf("plugin %q does not extend filter plugin", r.Name)
}
f.filterPlugins = append(f.filterPlugins, p)
} else {
return nil, fmt.Errorf("filter plugin %q does not exist", r.Name)
}
}
}
if plugins.Score != nil {
for _, sc := range plugins.Score.Enabled {
if pg, ok := pluginsMap[sc.Name]; ok {
p, ok := pg.(ScorePlugin)
if !ok {
return nil, fmt.Errorf("plugin %q does not extend score plugin", sc.Name)
}
if f.pluginNameToWeightMap[p.Name()] == 0 {
return nil, fmt.Errorf("score plugin %q is not configured with weight", p.Name())
}
f.scorePlugins = append(f.scorePlugins, p)
} else {
return nil, fmt.Errorf("score plugin %q does not exist", sc.Name)
}
}
}
if plugins.Reserve != nil {
for _, r := range plugins.Reserve.Enabled {
if pg, ok := pluginsMap[r.Name]; ok {
p, ok := pg.(ReservePlugin)
if !ok {
return nil, fmt.Errorf("plugin %q does not extend reserve plugin", r.Name)
}
f.reservePlugins = append(f.reservePlugins, p)
} else {
return nil, fmt.Errorf("reserve plugin %q does not exist", r.Name)
}
}
}
if plugins.PostFilter != nil {
for _, r := range plugins.PostFilter.Enabled {
if pg, ok := pluginsMap[r.Name]; ok {
p, ok := pg.(PostFilterPlugin)
if !ok {
return nil, fmt.Errorf("plugin %q does not extend post-filter plugin", r.Name)
}
f.postFilterPlugins = append(f.postFilterPlugins, p)
} else {
return nil, fmt.Errorf("post-filter plugin %q does not exist", r.Name)
}
}
}
if plugins.PreBind != nil {
for _, pb := range plugins.PreBind.Enabled {
if pg, ok := pluginsMap[pb.Name]; ok {
p, ok := pg.(PreBindPlugin)
if !ok {
return nil, fmt.Errorf("plugin %q does not extend prebind plugin", pb.Name)
}
f.preBindPlugins = append(f.preBindPlugins, p)
} else {
return nil, fmt.Errorf("prebind plugin %q does not exist", pb.Name)
}
}
}
if plugins.Bind != nil {
for _, pb := range plugins.Bind.Enabled {
if pg, ok := pluginsMap[pb.Name]; ok {
p, ok := pg.(BindPlugin)
if !ok {
return nil, fmt.Errorf("plugin %q does not extend bind plugin", pb.Name)
}
f.bindPlugins = append(f.bindPlugins, p)
} else {
return nil, fmt.Errorf("bind plugin %q does not exist", pb.Name)
}
}
}
if plugins.PostBind != nil {
for _, pb := range plugins.PostBind.Enabled {
if pg, ok := pluginsMap[pb.Name]; ok {
p, ok := pg.(PostBindPlugin)
if !ok {
return nil, fmt.Errorf("plugin %q does not extend postbind plugin", pb.Name)
}
f.postBindPlugins = append(f.postBindPlugins, p)
} else {
return nil, fmt.Errorf("postbind plugin %q does not exist", pb.Name)
}
}
}
if plugins.Unreserve != nil {
for _, ur := range plugins.Unreserve.Enabled {
if pg, ok := pluginsMap[ur.Name]; ok {
p, ok := pg.(UnreservePlugin)
if !ok {
return nil, fmt.Errorf("plugin %q does not extend unreserve plugin", ur.Name)
}
f.unreservePlugins = append(f.unreservePlugins, p)
} else {
return nil, fmt.Errorf("unreserve plugin %q does not exist", ur.Name)
}
}
}
if plugins.Permit != nil {
for _, pr := range plugins.Permit.Enabled {
if pg, ok := pluginsMap[pr.Name]; ok {
p, ok := pg.(PermitPlugin)
if !ok {
return nil, fmt.Errorf("plugin %q does not extend permit plugin", pr.Name)
}
f.permitPlugins = append(f.permitPlugins, p)
} else {
return nil, fmt.Errorf("permit plugin %q does not exist", pr.Name)
}
}
}
if plugins.QueueSort != nil {
for _, qs := range plugins.QueueSort.Enabled {
if pg, ok := pluginsMap[qs.Name]; ok {
p, ok := pg.(QueueSortPlugin)
if !ok {
return nil, fmt.Errorf("plugin %q does not extend queue sort plugin", qs.Name)
}
f.queueSortPlugins = append(f.queueSortPlugins, p)
if len(f.queueSortPlugins) > 1 {
return nil, fmt.Errorf("only one queue sort plugin can be enabled")
}
} else {
return nil, fmt.Errorf("queue sort plugin %q does not exist", qs.Name)
}
}
if len(f.queueSortPlugins) > 1 {
return nil, fmt.Errorf("only one queue sort plugin can be enabled")
}
return f, nil
}
func updatePluginList(pluginList reflect.Value, pluginSet *config.PluginSet, pluginType reflect.Type, pluginsMap map[string]Plugin) error {
if pluginSet == nil {
return nil
}
plugins := pluginList.Elem()
pluginType = pluginType.Elem()
set := sets.NewString()
for _, ep := range pluginSet.Enabled {
pg, ok := pluginsMap[ep.Name]
if !ok {
return fmt.Errorf("%s %q does not exist", pluginType.String(), ep.Name)
}
if !reflect.TypeOf(pg).Implements(pluginType) {
return fmt.Errorf("plugin %q does not extend %s plugin", ep.Name, pluginType.String())
}
if set.Has(ep.Name) {
return fmt.Errorf("plugin %q already registered as %q", ep.Name, pluginType.String())
}
set.Insert(ep.Name)
newPlugins := reflect.Append(plugins, reflect.ValueOf(pg))
plugins.Set(newPlugins)
}
return nil
}
// QueueSortFunc returns the function to sort pods in scheduling queue
func (f *framework) QueueSortFunc() LessFunc {
if len(f.queueSortPlugins) == 0 {

View File

@ -19,6 +19,7 @@ package v1alpha1
import (
"fmt"
"reflect"
"strings"
"testing"
v1 "k8s.io/api/core/v1"
@ -35,6 +36,7 @@ const (
pluginNotImplementingScore = "plugin-not-implementing-score"
preFilterPluginName = "prefilter-plugin"
preFilterWithExtensionsPluginName = "prefilter-with-extensions-plugin"
duplicatePluginName = "duplicate-plugin"
)
// TestScoreWithNormalizePlugin implements ScoreWithNormalizePlugin interface.
@ -166,12 +168,34 @@ func (pl *TestPreFilterWithExtensionsPlugin) Extensions() PreFilterExtensions {
return pl
}
type TestDuplicatePlugin struct {
}
func (dp *TestDuplicatePlugin) Name() string {
return duplicatePluginName
}
func (dp *TestDuplicatePlugin) PreFilter(pc *PluginContext, p *v1.Pod) *Status {
return nil
}
func (dp *TestDuplicatePlugin) Extensions() PreFilterExtensions {
return nil
}
var _ PreFilterPlugin = &TestDuplicatePlugin{}
func newDuplicatePlugin(_ *runtime.Unknown, _ FrameworkHandle) (Plugin, error) {
return &TestDuplicatePlugin{}, nil
}
var registry Registry = func() Registry {
r := make(Registry)
r.Register(scoreWithNormalizePlugin1, newScoreWithNormalizePlugin1)
r.Register(scoreWithNormalizePlugin2, newScoreWithNormalizePlugin2)
r.Register(scorePlugin1, newScorePlugin1)
r.Register(pluginNotImplementingScore, newPluginNotImplementingScore)
r.Register(duplicatePluginName, newDuplicatePlugin)
return r
}()
@ -239,6 +263,28 @@ func TestInitFrameworkWithScorePlugins(t *testing.T) {
}
}
func TestRegisterDuplicatePluginWouldFail(t *testing.T) {
plugin := config.Plugin{Name: duplicatePluginName, Weight: 1}
pluginSet := config.PluginSet{
Enabled: []config.Plugin{
plugin,
plugin,
},
}
plugins := config.Plugins{}
plugins.PreFilter = &pluginSet
_, err := NewFramework(registry, &plugins, emptyArgs)
if err == nil {
t.Fatal("Framework initialization should fail")
}
if err != nil && !strings.Contains(err.Error(), "already registered") {
t.Fatalf("Unexpected error, got %s, expect: plugin already registered", err.Error())
}
}
func TestRunScorePlugins(t *testing.T) {
tests := []struct {
name string