diff --git a/cmd/kube-scheduler/app/options/BUILD b/cmd/kube-scheduler/app/options/BUILD index 13d4e2de5cd..e73db131d60 100644 --- a/cmd/kube-scheduler/app/options/BUILD +++ b/cmd/kube-scheduler/app/options/BUILD @@ -70,6 +70,7 @@ go_test( "//cmd/kube-scheduler/app/config:go_default_library", "//pkg/scheduler/apis/config:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library", diff --git a/cmd/kube-scheduler/app/options/options_test.go b/cmd/kube-scheduler/app/options/options_test.go index 494cd698c9e..65418e72e6b 100644 --- a/cmd/kube-scheduler/app/options/options_test.go +++ b/cmd/kube-scheduler/app/options/options_test.go @@ -29,6 +29,7 @@ import ( "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/diff" apiserveroptions "k8s.io/apiserver/pkg/server/options" componentbaseconfig "k8s.io/component-base/config" @@ -146,6 +147,31 @@ users: t.Fatal(err) } + // plugin config + pluginconfigFile := filepath.Join(tmpDir, "plugin.yaml") + if err := ioutil.WriteFile(pluginconfigFile, []byte(fmt.Sprintf(` +apiVersion: kubescheduler.config.k8s.io/v1alpha1 +kind: KubeSchedulerConfiguration +clientConnection: + kubeconfig: "%s" +plugins: + reserve: + enabled: + - name: foo + - name: bar + disabled: + - name: baz + preBind: + enabled: + - name: foo + disabled: + - name: baz +pluginConfig: +- name: foo +`, configKubeconfig)), os.FileMode(0600)); err != nil { + t.Fatal(err) + } + // Insulate this test from picking up in-cluster config when run inside a pod // We can't assume we have permissions to write to /var/run/secrets/... from a unit test to mock in-cluster config for testing originalHost := os.Getenv("KUBERNETES_SERVICE_HOST") @@ -224,6 +250,7 @@ users: ContentType: "application/vnd.kubernetes.protobuf", }, BindTimeoutSeconds: &defaultBindTimeoutSeconds, + Plugins: nil, }, }, { @@ -334,6 +361,73 @@ users: }, expectedUsername: "none, http", }, + { + name: "plugin config", + options: &Options{ + ConfigFile: pluginconfigFile, + }, + expectedUsername: "config", + expectedConfig: kubeschedulerconfig.KubeSchedulerConfiguration{ + SchedulerName: "default-scheduler", + AlgorithmSource: kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &defaultSource}, + HardPodAffinitySymmetricWeight: 1, + HealthzBindAddress: "0.0.0.0:10251", + MetricsBindAddress: "0.0.0.0:10251", + LeaderElection: kubeschedulerconfig.KubeSchedulerLeaderElectionConfiguration{ + LeaderElectionConfiguration: componentbaseconfig.LeaderElectionConfiguration{ + LeaderElect: true, + LeaseDuration: metav1.Duration{Duration: 15 * time.Second}, + RenewDeadline: metav1.Duration{Duration: 10 * time.Second}, + RetryPeriod: metav1.Duration{Duration: 2 * time.Second}, + ResourceLock: "endpoints", + }, + LockObjectNamespace: "kube-system", + LockObjectName: "kube-scheduler", + }, + ClientConnection: componentbaseconfig.ClientConnectionConfiguration{ + Kubeconfig: configKubeconfig, + QPS: 50, + Burst: 100, + ContentType: "application/vnd.kubernetes.protobuf", + }, + BindTimeoutSeconds: &defaultBindTimeoutSeconds, + Plugins: &kubeschedulerconfig.Plugins{ + Reserve: &kubeschedulerconfig.PluginSet{ + Enabled: []kubeschedulerconfig.Plugin{ + { + Name: "foo", + }, + { + Name: "bar", + }, + }, + Disabled: []kubeschedulerconfig.Plugin{ + { + Name: "baz", + }, + }, + }, + PreBind: &kubeschedulerconfig.PluginSet{ + Enabled: []kubeschedulerconfig.Plugin{ + { + Name: "foo", + }, + }, + Disabled: []kubeschedulerconfig.Plugin{ + { + Name: "baz", + }, + }, + }, + }, + PluginConfig: []kubeschedulerconfig.PluginConfig{ + { + Name: "foo", + Args: runtime.Unknown{}, + }, + }, + }, + }, { name: "no config", options: &Options{}, diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index e4fd4a25a2f..58745fa9fda 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -176,6 +176,8 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error cc.ComponentConfig.AlgorithmSource, stopCh, framework.NewRegistry(), + cc.ComponentConfig.Plugins, + cc.ComponentConfig.PluginConfig, scheduler.WithName(cc.ComponentConfig.SchedulerName), scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight), scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption), diff --git a/pkg/scheduler/apis/config/types.go b/pkg/scheduler/apis/config/types.go index 61b7eeaeb02..4c2c5f8c2c2 100644 --- a/pkg/scheduler/apis/config/types.go +++ b/pkg/scheduler/apis/config/types.go @@ -18,6 +18,7 @@ package config import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" componentbaseconfig "k8s.io/component-base/config" ) @@ -86,6 +87,17 @@ type KubeSchedulerConfiguration struct { // Value must be non-negative integer. The value zero indicates no waiting. // If this value is nil, the default value will be used. BindTimeoutSeconds *int64 + + // Plugins specify the set of plugins that should be enabled or disabled. Enabled plugins are the + // ones that should be enabled in addition to the default plugins. Disabled plugins are any of the + // default plugins that should be disabled. + // When no enabled or disabled plugin is specified for an extension point, default plugins for + // that extension point will be used if there is any. + Plugins *Plugins + + // PluginConfig is an optional set of custom plugin arguments for each plugin. + // Omitting config args for a plugin is equivalent to using the default config for that plugin. + PluginConfig []PluginConfig } // SchedulerAlgorithmSource is the source of a scheduler algorithm. One source @@ -131,3 +143,76 @@ type KubeSchedulerLeaderElectionConfiguration struct { // LockObjectName defines the lock object name LockObjectName string } + +// Plugins include multiple extension points. When specified, the list of plugins for +// a particular extension point are the only ones enabled. If an extension point is +// omitted from the config, then the default set of plugins is used for that extension point. +// Enabled plugins are called in the order specified here, after default plugins. If they need to +// be invoked before default plugins, default plugins must be disabled and re-enabled here in desired order. +type Plugins struct { + // QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue. + QueueSort *PluginSet + + // PreFilter is a list of plugins that should be invoked at "PreFilter" extension point of the scheduling framework. + PreFilter *PluginSet + + // Filter is a list of plugins that should be invoked when filtering out nodes that cannot run the Pod. + Filter *PluginSet + + // PostFilter is a list of plugins that are invoked after filtering out infeasible nodes. + PostFilter *PluginSet + + // Score is a list of plugins that should be invoked when ranking nodes that have passed the filtering phase. + Score *PluginSet + + // NormalizeScore is a list of plugins that should be invoked after the scoring phase to normalize scores. + NormalizeScore *PluginSet + + // Reserve is a list of plugins invoked when reserving a node to run the pod. + Reserve *PluginSet + + // Permit is a list of plugins that control binding of a Pod. These plugins can prevent or delay binding of a Pod. + Permit *PluginSet + + // PreBind is a list of plugins that should be invoked before a pod is bound. + PreBind *PluginSet + + // Bind is a list of plugins that should be invoked at "Bind" extension point of the scheduling framework. + // The scheduler call these plugins in order. Scheduler skips the rest of these plugins as soon as one returns success. + Bind *PluginSet + + // PostBind is a list of plugins that should be invoked after a pod is successfully bound. + PostBind *PluginSet + + // Unreserve is a list of plugins invoked when a pod that was previously reserved is rejected in a later phase. + Unreserve *PluginSet +} + +// PluginSet specifies enabled and disabled plugins for an extension point. +// If an array is empty, missing, or nil, default plugins at that extension point will be used. +type PluginSet struct { + // Enabled specifies plugins that should be enabled in addition to default plugins. + // These are called after default plugins and in the same order specified here. + Enabled []Plugin + // Disabled specifies default plugins that should be disabled. + // When all default plugins need to be disabled, an array containing only one "*" should be provided. + Disabled []Plugin +} + +// Plugin specifies a plugin name and its weight when applicable. Weight is used only for Score plugins. +type Plugin struct { + // Name defines the name of plugin + Name string + // Weight defines the weight of plugin, only used for Score plugins. + Weight int32 +} + +// PluginConfig specifies arguments that should be passed to a plugin at the time of initialization. +// A plugin that is invoked at multiple extension points is initialized once. Args can have arbitrary structure. +// It is up to the plugin to process these Args. +type PluginConfig struct { + // Name defines the name of plugin being configured + Name string + // Args defines the arguments passed to the plugins at the time of initialization. Args can have arbitrary structure. + Args runtime.Unknown +} diff --git a/pkg/scheduler/apis/config/v1alpha1/zz_generated.conversion.go b/pkg/scheduler/apis/config/v1alpha1/zz_generated.conversion.go index e4565677e05..863225c37fa 100644 --- a/pkg/scheduler/apis/config/v1alpha1/zz_generated.conversion.go +++ b/pkg/scheduler/apis/config/v1alpha1/zz_generated.conversion.go @@ -57,6 +57,46 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddGeneratedConversionFunc((*v1alpha1.Plugin)(nil), (*config.Plugin)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha1_Plugin_To_config_Plugin(a.(*v1alpha1.Plugin), b.(*config.Plugin), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*config.Plugin)(nil), (*v1alpha1.Plugin)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_Plugin_To_v1alpha1_Plugin(a.(*config.Plugin), b.(*v1alpha1.Plugin), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*v1alpha1.PluginConfig)(nil), (*config.PluginConfig)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha1_PluginConfig_To_config_PluginConfig(a.(*v1alpha1.PluginConfig), b.(*config.PluginConfig), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*config.PluginConfig)(nil), (*v1alpha1.PluginConfig)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_PluginConfig_To_v1alpha1_PluginConfig(a.(*config.PluginConfig), b.(*v1alpha1.PluginConfig), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*v1alpha1.PluginSet)(nil), (*config.PluginSet)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha1_PluginSet_To_config_PluginSet(a.(*v1alpha1.PluginSet), b.(*config.PluginSet), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*config.PluginSet)(nil), (*v1alpha1.PluginSet)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_PluginSet_To_v1alpha1_PluginSet(a.(*config.PluginSet), b.(*v1alpha1.PluginSet), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*v1alpha1.Plugins)(nil), (*config.Plugins)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha1_Plugins_To_config_Plugins(a.(*v1alpha1.Plugins), b.(*config.Plugins), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*config.Plugins)(nil), (*v1alpha1.Plugins)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_Plugins_To_v1alpha1_Plugins(a.(*config.Plugins), b.(*v1alpha1.Plugins), scope) + }); err != nil { + return err + } if err := s.AddGeneratedConversionFunc((*v1alpha1.SchedulerAlgorithmSource)(nil), (*config.SchedulerAlgorithmSource)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1alpha1_SchedulerAlgorithmSource_To_config_SchedulerAlgorithmSource(a.(*v1alpha1.SchedulerAlgorithmSource), b.(*config.SchedulerAlgorithmSource), scope) }); err != nil { @@ -120,6 +160,8 @@ func autoConvert_v1alpha1_KubeSchedulerConfiguration_To_config_KubeSchedulerConf out.DisablePreemption = in.DisablePreemption out.PercentageOfNodesToScore = in.PercentageOfNodesToScore out.BindTimeoutSeconds = (*int64)(unsafe.Pointer(in.BindTimeoutSeconds)) + out.Plugins = (*config.Plugins)(unsafe.Pointer(in.Plugins)) + out.PluginConfig = *(*[]config.PluginConfig)(unsafe.Pointer(&in.PluginConfig)) return nil } @@ -148,6 +190,8 @@ func autoConvert_config_KubeSchedulerConfiguration_To_v1alpha1_KubeSchedulerConf out.DisablePreemption = in.DisablePreemption out.PercentageOfNodesToScore = in.PercentageOfNodesToScore out.BindTimeoutSeconds = (*int64)(unsafe.Pointer(in.BindTimeoutSeconds)) + out.Plugins = (*v1alpha1.Plugins)(unsafe.Pointer(in.Plugins)) + out.PluginConfig = *(*[]v1alpha1.PluginConfig)(unsafe.Pointer(&in.PluginConfig)) return nil } @@ -184,6 +228,114 @@ func Convert_config_KubeSchedulerLeaderElectionConfiguration_To_v1alpha1_KubeSch return autoConvert_config_KubeSchedulerLeaderElectionConfiguration_To_v1alpha1_KubeSchedulerLeaderElectionConfiguration(in, out, s) } +func autoConvert_v1alpha1_Plugin_To_config_Plugin(in *v1alpha1.Plugin, out *config.Plugin, s conversion.Scope) error { + out.Name = in.Name + out.Weight = in.Weight + return nil +} + +// Convert_v1alpha1_Plugin_To_config_Plugin is an autogenerated conversion function. +func Convert_v1alpha1_Plugin_To_config_Plugin(in *v1alpha1.Plugin, out *config.Plugin, s conversion.Scope) error { + return autoConvert_v1alpha1_Plugin_To_config_Plugin(in, out, s) +} + +func autoConvert_config_Plugin_To_v1alpha1_Plugin(in *config.Plugin, out *v1alpha1.Plugin, s conversion.Scope) error { + out.Name = in.Name + out.Weight = in.Weight + return nil +} + +// Convert_config_Plugin_To_v1alpha1_Plugin is an autogenerated conversion function. +func Convert_config_Plugin_To_v1alpha1_Plugin(in *config.Plugin, out *v1alpha1.Plugin, s conversion.Scope) error { + return autoConvert_config_Plugin_To_v1alpha1_Plugin(in, out, s) +} + +func autoConvert_v1alpha1_PluginConfig_To_config_PluginConfig(in *v1alpha1.PluginConfig, out *config.PluginConfig, s conversion.Scope) error { + out.Name = in.Name + out.Args = in.Args + return nil +} + +// Convert_v1alpha1_PluginConfig_To_config_PluginConfig is an autogenerated conversion function. +func Convert_v1alpha1_PluginConfig_To_config_PluginConfig(in *v1alpha1.PluginConfig, out *config.PluginConfig, s conversion.Scope) error { + return autoConvert_v1alpha1_PluginConfig_To_config_PluginConfig(in, out, s) +} + +func autoConvert_config_PluginConfig_To_v1alpha1_PluginConfig(in *config.PluginConfig, out *v1alpha1.PluginConfig, s conversion.Scope) error { + out.Name = in.Name + out.Args = in.Args + return nil +} + +// Convert_config_PluginConfig_To_v1alpha1_PluginConfig is an autogenerated conversion function. +func Convert_config_PluginConfig_To_v1alpha1_PluginConfig(in *config.PluginConfig, out *v1alpha1.PluginConfig, s conversion.Scope) error { + return autoConvert_config_PluginConfig_To_v1alpha1_PluginConfig(in, out, s) +} + +func autoConvert_v1alpha1_PluginSet_To_config_PluginSet(in *v1alpha1.PluginSet, out *config.PluginSet, s conversion.Scope) error { + out.Enabled = *(*[]config.Plugin)(unsafe.Pointer(&in.Enabled)) + out.Disabled = *(*[]config.Plugin)(unsafe.Pointer(&in.Disabled)) + return nil +} + +// Convert_v1alpha1_PluginSet_To_config_PluginSet is an autogenerated conversion function. +func Convert_v1alpha1_PluginSet_To_config_PluginSet(in *v1alpha1.PluginSet, out *config.PluginSet, s conversion.Scope) error { + return autoConvert_v1alpha1_PluginSet_To_config_PluginSet(in, out, s) +} + +func autoConvert_config_PluginSet_To_v1alpha1_PluginSet(in *config.PluginSet, out *v1alpha1.PluginSet, s conversion.Scope) error { + out.Enabled = *(*[]v1alpha1.Plugin)(unsafe.Pointer(&in.Enabled)) + out.Disabled = *(*[]v1alpha1.Plugin)(unsafe.Pointer(&in.Disabled)) + return nil +} + +// Convert_config_PluginSet_To_v1alpha1_PluginSet is an autogenerated conversion function. +func Convert_config_PluginSet_To_v1alpha1_PluginSet(in *config.PluginSet, out *v1alpha1.PluginSet, s conversion.Scope) error { + return autoConvert_config_PluginSet_To_v1alpha1_PluginSet(in, out, s) +} + +func autoConvert_v1alpha1_Plugins_To_config_Plugins(in *v1alpha1.Plugins, out *config.Plugins, s conversion.Scope) error { + out.QueueSort = (*config.PluginSet)(unsafe.Pointer(in.QueueSort)) + out.PreFilter = (*config.PluginSet)(unsafe.Pointer(in.PreFilter)) + out.Filter = (*config.PluginSet)(unsafe.Pointer(in.Filter)) + out.PostFilter = (*config.PluginSet)(unsafe.Pointer(in.PostFilter)) + out.Score = (*config.PluginSet)(unsafe.Pointer(in.Score)) + out.NormalizeScore = (*config.PluginSet)(unsafe.Pointer(in.NormalizeScore)) + out.Reserve = (*config.PluginSet)(unsafe.Pointer(in.Reserve)) + out.Permit = (*config.PluginSet)(unsafe.Pointer(in.Permit)) + out.PreBind = (*config.PluginSet)(unsafe.Pointer(in.PreBind)) + out.Bind = (*config.PluginSet)(unsafe.Pointer(in.Bind)) + out.PostBind = (*config.PluginSet)(unsafe.Pointer(in.PostBind)) + out.Unreserve = (*config.PluginSet)(unsafe.Pointer(in.Unreserve)) + return nil +} + +// Convert_v1alpha1_Plugins_To_config_Plugins is an autogenerated conversion function. +func Convert_v1alpha1_Plugins_To_config_Plugins(in *v1alpha1.Plugins, out *config.Plugins, s conversion.Scope) error { + return autoConvert_v1alpha1_Plugins_To_config_Plugins(in, out, s) +} + +func autoConvert_config_Plugins_To_v1alpha1_Plugins(in *config.Plugins, out *v1alpha1.Plugins, s conversion.Scope) error { + out.QueueSort = (*v1alpha1.PluginSet)(unsafe.Pointer(in.QueueSort)) + out.PreFilter = (*v1alpha1.PluginSet)(unsafe.Pointer(in.PreFilter)) + out.Filter = (*v1alpha1.PluginSet)(unsafe.Pointer(in.Filter)) + out.PostFilter = (*v1alpha1.PluginSet)(unsafe.Pointer(in.PostFilter)) + out.Score = (*v1alpha1.PluginSet)(unsafe.Pointer(in.Score)) + out.NormalizeScore = (*v1alpha1.PluginSet)(unsafe.Pointer(in.NormalizeScore)) + out.Reserve = (*v1alpha1.PluginSet)(unsafe.Pointer(in.Reserve)) + out.Permit = (*v1alpha1.PluginSet)(unsafe.Pointer(in.Permit)) + out.PreBind = (*v1alpha1.PluginSet)(unsafe.Pointer(in.PreBind)) + out.Bind = (*v1alpha1.PluginSet)(unsafe.Pointer(in.Bind)) + out.PostBind = (*v1alpha1.PluginSet)(unsafe.Pointer(in.PostBind)) + out.Unreserve = (*v1alpha1.PluginSet)(unsafe.Pointer(in.Unreserve)) + return nil +} + +// Convert_config_Plugins_To_v1alpha1_Plugins is an autogenerated conversion function. +func Convert_config_Plugins_To_v1alpha1_Plugins(in *config.Plugins, out *v1alpha1.Plugins, s conversion.Scope) error { + return autoConvert_config_Plugins_To_v1alpha1_Plugins(in, out, s) +} + func autoConvert_v1alpha1_SchedulerAlgorithmSource_To_config_SchedulerAlgorithmSource(in *v1alpha1.SchedulerAlgorithmSource, out *config.SchedulerAlgorithmSource, s conversion.Scope) error { out.Policy = (*config.SchedulerPolicySource)(unsafe.Pointer(in.Policy)) out.Provider = (*string)(unsafe.Pointer(in.Provider)) diff --git a/pkg/scheduler/apis/config/zz_generated.deepcopy.go b/pkg/scheduler/apis/config/zz_generated.deepcopy.go index ee751173c20..417f17a769b 100644 --- a/pkg/scheduler/apis/config/zz_generated.deepcopy.go +++ b/pkg/scheduler/apis/config/zz_generated.deepcopy.go @@ -37,6 +37,18 @@ func (in *KubeSchedulerConfiguration) DeepCopyInto(out *KubeSchedulerConfigurati *out = new(int64) **out = **in } + if in.Plugins != nil { + in, out := &in.Plugins, &out.Plugins + *out = new(Plugins) + (*in).DeepCopyInto(*out) + } + if in.PluginConfig != nil { + in, out := &in.PluginConfig, &out.PluginConfig + *out = make([]PluginConfig, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } return } @@ -75,6 +87,141 @@ func (in *KubeSchedulerLeaderElectionConfiguration) DeepCopy() *KubeSchedulerLea return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Plugin) DeepCopyInto(out *Plugin) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Plugin. +func (in *Plugin) DeepCopy() *Plugin { + if in == nil { + return nil + } + out := new(Plugin) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PluginConfig) DeepCopyInto(out *PluginConfig) { + *out = *in + in.Args.DeepCopyInto(&out.Args) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PluginConfig. +func (in *PluginConfig) DeepCopy() *PluginConfig { + if in == nil { + return nil + } + out := new(PluginConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PluginSet) DeepCopyInto(out *PluginSet) { + *out = *in + if in.Enabled != nil { + in, out := &in.Enabled, &out.Enabled + *out = make([]Plugin, len(*in)) + copy(*out, *in) + } + if in.Disabled != nil { + in, out := &in.Disabled, &out.Disabled + *out = make([]Plugin, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PluginSet. +func (in *PluginSet) DeepCopy() *PluginSet { + if in == nil { + return nil + } + out := new(PluginSet) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Plugins) DeepCopyInto(out *Plugins) { + *out = *in + if in.QueueSort != nil { + in, out := &in.QueueSort, &out.QueueSort + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.PreFilter != nil { + in, out := &in.PreFilter, &out.PreFilter + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.Filter != nil { + in, out := &in.Filter, &out.Filter + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.PostFilter != nil { + in, out := &in.PostFilter, &out.PostFilter + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.Score != nil { + in, out := &in.Score, &out.Score + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.NormalizeScore != nil { + in, out := &in.NormalizeScore, &out.NormalizeScore + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.Reserve != nil { + in, out := &in.Reserve, &out.Reserve + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.Permit != nil { + in, out := &in.Permit, &out.Permit + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.PreBind != nil { + in, out := &in.PreBind, &out.PreBind + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.Bind != nil { + in, out := &in.Bind, &out.Bind + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.PostBind != nil { + in, out := &in.PostBind, &out.PostBind + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.Unreserve != nil { + in, out := &in.Unreserve, &out.Unreserve + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Plugins. +func (in *Plugins) DeepCopy() *Plugins { + if in == nil { + return nil + } + out := new(Plugins) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SchedulerAlgorithmSource) DeepCopyInto(out *SchedulerAlgorithmSource) { *out = *in diff --git a/pkg/scheduler/core/BUILD b/pkg/scheduler/core/BUILD index e6e7126dbba..0f7b3dbef70 100644 --- a/pkg/scheduler/core/BUILD +++ b/pkg/scheduler/core/BUILD @@ -48,6 +48,7 @@ go_test( "//pkg/scheduler/algorithm/priorities:go_default_library", "//pkg/scheduler/algorithm/priorities/util:go_default_library", "//pkg/scheduler/api:go_default_library", + "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/queue:go_default_library", diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index 6c0e83aa704..444f5a426e1 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -29,7 +29,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" - framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" @@ -532,7 +531,6 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { extenders = append(extenders, &test.extenders[ii]) } cache := internalcache.New(time.Duration(0), wait.NeverStop) - fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil) for _, name := range test.nodes { cache.AddNode(createNode(name)) } @@ -544,7 +542,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { predicates.EmptyPredicateMetadataProducer, test.prioritizers, priorities.EmptyPriorityMetadataProducer, - fwk, + emptyFramework, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{}, diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index c92b1600093..9ca5a9c9540 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -38,6 +38,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" + schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" @@ -136,7 +137,7 @@ func getNodeReducePriority(pod *v1.Pod, meta interface{}, nodeNameToInfo map[str // EmptyPluginRegistry is a test plugin set used by the default scheduler. var EmptyPluginRegistry = framework.Registry{} -var emptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil) +var emptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil, []schedulerconfig.PluginConfig{}) func makeNodeList(nodeNames []string) []*v1.Node { result := make([]*v1.Node, 0, len(nodeNames)) @@ -438,7 +439,6 @@ func TestGenericScheduler(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { cache := internalcache.New(time.Duration(0), wait.NeverStop) - fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil) for _, pod := range test.pods { cache.AddPod(pod) } @@ -457,7 +457,7 @@ func TestGenericScheduler(t *testing.T) { algorithmpredicates.EmptyPredicateMetadataProducer, test.prioritizers, priorities.EmptyPriorityMetadataProducer, - fwk, + emptyFramework, []algorithm.SchedulerExtender{}, nil, pvcLister, @@ -480,7 +480,6 @@ func TestGenericScheduler(t *testing.T) { // makeScheduler makes a simple genericScheduler for testing. func makeScheduler(predicates map[string]algorithmpredicates.FitPredicate, nodes []*v1.Node) *genericScheduler { cache := internalcache.New(time.Duration(0), wait.NeverStop) - fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil) for _, n := range nodes { cache.AddNode(n) } @@ -493,7 +492,7 @@ func makeScheduler(predicates map[string]algorithmpredicates.FitPredicate, nodes algorithmpredicates.EmptyPredicateMetadataProducer, prioritizers, priorities.EmptyPriorityMetadataProducer, - fwk, + emptyFramework, nil, nil, nil, nil, false, false, schedulerapi.DefaultPercentageOfNodesToScore) cache.UpdateNodeInfoSnapshot(s.(*genericScheduler).nodeInfoSnapshot) @@ -1469,7 +1468,6 @@ func TestPreempt(t *testing.T) { t.Logf("===== Running test %v", t.Name()) stop := make(chan struct{}) cache := internalcache.New(time.Duration(0), stop) - fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil) for _, pod := range test.pods { cache.AddPod(pod) } @@ -1496,7 +1494,7 @@ func TestPreempt(t *testing.T) { algorithmpredicates.EmptyPredicateMetadataProducer, []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}}, priorities.EmptyPriorityMetadataProducer, - fwk, + emptyFramework, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{}, diff --git a/pkg/scheduler/factory/BUILD b/pkg/scheduler/factory/BUILD index 453444191c0..15a03325ccf 100644 --- a/pkg/scheduler/factory/BUILD +++ b/pkg/scheduler/factory/BUILD @@ -15,6 +15,7 @@ go_library( "//pkg/scheduler/algorithm/priorities:go_default_library", "//pkg/scheduler/api:go_default_library", "//pkg/scheduler/api/validation:go_default_library", + "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/core:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", @@ -60,6 +61,7 @@ go_test( "//pkg/scheduler/algorithm/priorities:go_default_library", "//pkg/scheduler/api:go_default_library", "//pkg/scheduler/api/latest:go_default_library", + "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/queue:go_default_library", diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index c647032298e..73b4646edc4 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -22,7 +22,7 @@ import ( "fmt" "time" - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -50,6 +50,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" "k8s.io/kubernetes/pkg/scheduler/api/validation" + "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/core" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" @@ -238,6 +239,8 @@ type ConfigFactoryArgs struct { BindTimeoutSeconds int64 StopCh <-chan struct{} Registry framework.Registry + Plugins *config.Plugins + PluginConfig []config.PluginConfig } // NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only @@ -248,8 +251,8 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator { stopEverything = wait.NeverStop } schedulerCache := internalcache.New(30*time.Second, stopEverything) - // TODO(bsalamat): config files should be passed to the framework. - framework, err := framework.NewFramework(args.Registry, nil) + + framework, err := framework.NewFramework(args.Registry, args.Plugins, args.PluginConfig) if err != nil { klog.Fatalf("error initializing the scheduling framework: %v", err) } diff --git a/pkg/scheduler/factory/factory_test.go b/pkg/scheduler/factory/factory_test.go index 44198aa2afc..efe82c88057 100644 --- a/pkg/scheduler/factory/factory_test.go +++ b/pkg/scheduler/factory/factory_test.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest" + "k8s.io/kubernetes/pkg/scheduler/apis/config" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" @@ -496,6 +497,8 @@ func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight bindTimeoutSeconds, stopCh, framework.NewRegistry(), + nil, + []config.PluginConfig{}, }) } diff --git a/pkg/scheduler/framework/v1alpha1/BUILD b/pkg/scheduler/framework/v1alpha1/BUILD index 98cdc3565f3..3d2064635be 100644 --- a/pkg/scheduler/framework/v1alpha1/BUILD +++ b/pkg/scheduler/framework/v1alpha1/BUILD @@ -12,6 +12,7 @@ go_library( importpath = "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1", visibility = ["//visibility:public"], deps = [ + "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index b1593a46261..0c05bfbcd2d 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/klog" + "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/internal/cache" ) @@ -49,44 +50,113 @@ const ( var _ = Framework(&framework{}) // NewFramework initializes plugins given the configuration and the registry. -func NewFramework(r Registry, _ *runtime.Unknown) (Framework, error) { +func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfig) (Framework, error) { f := &framework{ registry: r, nodeInfoSnapshot: cache.NewNodeInfoSnapshot(), plugins: make(map[string]Plugin), waitingPods: newWaitingPodsMap(), } + if plugins == nil { + return f, nil + } - // TODO: The framework needs to read the scheduler config and initialize only - // needed plugins. In this initial version of the code, we initialize all. + // get needed plugins from config + pg := pluginsNeeded(plugins) + if len(pg) == 0 { + return f, nil + } + + pluginConfig := pluginNameToConfig(args) for name, factory := range r { - // TODO: 'nil' should be replaced by plugin config. - p, err := factory(nil, f) + // initialize only needed plugins + if _, ok := pg[name]; !ok { + continue + } + + // find the config args of a plugin + pc := pluginConfig[name] + + p, err := factory(pc, f) if err != nil { return nil, fmt.Errorf("error initializing plugin %v: %v", name, err) } f.plugins[name] = p + } - // TODO: For now, we assume any plugins that implements an extension - // point wants to be called at that extension point. We should change this - // later and add these plugins based on the configuration. - if qsp, ok := p.(QueueSortPlugin); ok { - f.queueSortPlugins = append(f.queueSortPlugins, qsp) - } - - if rp, ok := p.(ReservePlugin); ok { - f.reservePlugins = append(f.reservePlugins, rp) - } - if pp, ok := p.(PrebindPlugin); ok { - f.prebindPlugins = append(f.prebindPlugins, pp) - } - if up, ok := p.(UnreservePlugin); ok { - f.unreservePlugins = append(f.unreservePlugins, up) - } - if pr, ok := p.(PermitPlugin); ok { - f.permitPlugins = append(f.permitPlugins, pr) + if plugins.Reserve != nil { + for _, r := range plugins.Reserve.Enabled { + if pg, ok := f.plugins[r.Name]; ok { + p, ok := pg.(ReservePlugin) + if !ok { + return nil, fmt.Errorf("plugin %v does not extend reserve plugin", r.Name) + } + f.reservePlugins = append(f.reservePlugins, p) + } else { + return nil, fmt.Errorf("reserve plugin %v does not exist", r.Name) + } } } + + if plugins.PreBind != nil { + for _, pb := range plugins.PreBind.Enabled { + if pg, ok := f.plugins[pb.Name]; ok { + p, ok := pg.(PrebindPlugin) + if !ok { + return nil, fmt.Errorf("plugin %v does not extend prebind plugin", pb.Name) + } + f.prebindPlugins = append(f.prebindPlugins, p) + } else { + return nil, fmt.Errorf("prebind plugin %v does not exist", pb.Name) + } + } + } + + if plugins.Unreserve != nil { + for _, ur := range plugins.Unreserve.Enabled { + if pg, ok := f.plugins[ur.Name]; ok { + p, ok := pg.(UnreservePlugin) + if !ok { + return nil, fmt.Errorf("plugin %v does not extend unreserve plugin", ur.Name) + } + f.unreservePlugins = append(f.unreservePlugins, p) + } else { + return nil, fmt.Errorf("unreserve plugin %v does not exist", ur.Name) + } + } + } + + if plugins.Permit != nil { + for _, pr := range plugins.Permit.Enabled { + if pg, ok := f.plugins[pr.Name]; ok { + p, ok := pg.(PermitPlugin) + if !ok { + return nil, fmt.Errorf("plugin %v does not extend permit plugin", pr.Name) + } + f.permitPlugins = append(f.permitPlugins, p) + } else { + return nil, fmt.Errorf("permit plugin %v does not exist", pr.Name) + } + } + } + + if plugins.QueueSort != nil { + for _, qs := range plugins.QueueSort.Enabled { + if pg, ok := f.plugins[qs.Name]; ok { + p, ok := pg.(QueueSortPlugin) + if !ok { + return nil, fmt.Errorf("plugin %v does not extend queue sort plugin", qs.Name) + } + f.queueSortPlugins = append(f.queueSortPlugins, p) + if len(f.queueSortPlugins) > 1 { + return nil, fmt.Errorf("only one queue sort plugin can be enabled") + } + } else { + return nil, fmt.Errorf("queue sort plugin %v does not exist", qs.Name) + } + } + } + return f, nil } @@ -225,3 +295,42 @@ func (f *framework) IterateOverWaitingPods(callback func(WaitingPod)) { func (f *framework) GetWaitingPod(uid types.UID) WaitingPod { return f.waitingPods.get(uid) } + +func pluginNameToConfig(args []config.PluginConfig) map[string]*runtime.Unknown { + pc := make(map[string]*runtime.Unknown, 0) + for _, p := range args { + pc[p.Name] = &p.Args + } + return pc +} + +func pluginsNeeded(plugins *config.Plugins) map[string]struct{} { + pgMap := make(map[string]struct{}, 0) + + if plugins == nil { + return pgMap + } + + find := func(pgs *config.PluginSet) { + if pgs == nil { + return + } + for _, pg := range pgs.Enabled { + pgMap[pg.Name] = struct{}{} + } + } + find(plugins.QueueSort) + find(plugins.PreFilter) + find(plugins.Filter) + find(plugins.PostFilter) + find(plugins.Score) + find(plugins.NormalizeScore) + find(plugins.Reserve) + find(plugins.Permit) + find(plugins.PreBind) + find(plugins.Bind) + find(plugins.PostBind) + find(plugins.Unreserve) + + return pgMap +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index cf411188a13..2574903c23f 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -133,6 +133,8 @@ func New(client clientset.Interface, schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource, stopCh <-chan struct{}, registry framework.Registry, + plugins *kubeschedulerconfig.Plugins, + pluginConfig []kubeschedulerconfig.PluginConfig, opts ...func(o *schedulerOptions)) (*Scheduler, error) { options := defaultSchedulerOptions @@ -158,6 +160,8 @@ func New(client clientset.Interface, PercentageOfNodesToScore: options.percentageOfNodesToScore, BindTimeoutSeconds: options.bindTimeoutSeconds, Registry: registry, + Plugins: plugins, + PluginConfig: pluginConfig, }) var config *factory.Config source := schedulerAlgorithmSource diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 6436b5c9284..cfb81e7bc21 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -57,6 +57,13 @@ import ( "k8s.io/kubernetes/pkg/scheduler/volumebinder" ) +// EmptyFramework is an empty framework used in tests. +// Note: If the test runs in goroutine, please don't using this variable to avoid a race condition. +var EmptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil, EmptyPluginConfig) + +// EmptyPluginConfig is an empty plugin config used in tests. +var EmptyPluginConfig = []kubeschedulerconfig.PluginConfig{} + type fakeBinder struct { b func(binding *v1.Binding) error } @@ -197,6 +204,8 @@ func TestSchedulerCreation(t *testing.T) { kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &testSource}, stopCh, EmptyPluginRegistry, + nil, + EmptyPluginConfig, WithBindTimeoutSeconds(defaultBindTimeout)) if err != nil { @@ -274,7 +283,6 @@ func TestScheduler(t *testing.T) { var gotAssumedPod *v1.Pod var gotBinding *v1.Binding - fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil) s := NewFromConfig(&factory.Config{ SchedulerCache: &fakecache.Cache{ ForgetFunc: func(pod *v1.Pod) { @@ -300,7 +308,7 @@ func TestScheduler(t *testing.T) { NextPod: func() *v1.Pod { return item.sendPod }, - Framework: fwk, + Framework: EmptyFramework, Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "scheduler"}), VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}), }) @@ -638,7 +646,6 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { // queuedPodStore: pods queued before processing. // scache: scheduler cache that might contain assumed pods. func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) { - framework, _ := framework.NewFramework(EmptyPluginRegistry, nil) algo := core.NewGenericScheduler( scache, internalqueue.NewSchedulingQueue(nil, nil), @@ -646,7 +653,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C predicates.EmptyPredicateMetadataProducer, []priorities.PriorityConfig{}, priorities.EmptyPriorityMetadataProducer, - framework, + EmptyFramework, []algorithm.SchedulerExtender{}, nil, informerFactory.Core().V1().PersistentVolumeClaims().Lister(), @@ -677,7 +684,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C Recorder: &record.FakeRecorder{}, PodConditionUpdater: fakePodConditionUpdater{}, PodPreemptor: fakePodPreemptor{}, - Framework: framework, + Framework: EmptyFramework, VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}), } @@ -691,7 +698,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C } func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) { - framework, _ := framework.NewFramework(EmptyPluginRegistry, nil) + framework, _ := framework.NewFramework(EmptyPluginRegistry, nil, []kubeschedulerconfig.PluginConfig{}) algo := core.NewGenericScheduler( scache, internalqueue.NewSchedulingQueue(nil, nil), diff --git a/staging/src/k8s.io/kube-scheduler/config/v1alpha1/types.go b/staging/src/k8s.io/kube-scheduler/config/v1alpha1/types.go index d8eeba7f79f..1cea11f9fdb 100644 --- a/staging/src/k8s.io/kube-scheduler/config/v1alpha1/types.go +++ b/staging/src/k8s.io/kube-scheduler/config/v1alpha1/types.go @@ -18,6 +18,7 @@ package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" componentbaseconfigv1alpha1 "k8s.io/component-base/config/v1alpha1" ) @@ -82,6 +83,17 @@ type KubeSchedulerConfiguration struct { // Value must be non-negative integer. The value zero indicates no waiting. // If this value is nil, the default value will be used. BindTimeoutSeconds *int64 `json:"bindTimeoutSeconds"` + + // Plugins specify the set of plugins that should be enabled or disabled. Enabled plugins are the + // ones that should be enabled in addition to the default plugins. Disabled plugins are any of the + // default plugins that should be disabled. + // When no enabled or disabled plugin is specified for an extension point, default plugins for + // that extension point will be used if there is any. + Plugins *Plugins `json:"plugins,omitempty"` + + // PluginConfig is an optional set of custom plugin arguments for each plugin. + // Omitting config args for a plugin is equivalent to using the default config for that plugin. + PluginConfig []PluginConfig `json:"pluginConfig,omitempty"` } // SchedulerAlgorithmSource is the source of a scheduler algorithm. One source @@ -127,3 +139,76 @@ type KubeSchedulerLeaderElectionConfiguration struct { // LockObjectName defines the lock object name LockObjectName string `json:"lockObjectName"` } + +// Plugins include multiple extension points. When specified, the list of plugins for +// a particular extension point are the only ones enabled. If an extension point is +// omitted from the config, then the default set of plugins is used for that extension point. +// Enabled plugins are called in the order specified here, after default plugins. If they need to +// be invoked before default plugins, default plugins must be disabled and re-enabled here in desired order. +type Plugins struct { + // QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue. + QueueSort *PluginSet `json:"queueSort,omitempty"` + + // PreFilter is a list of plugins that should be invoked at "PreFilter" extension point of the scheduling framework. + PreFilter *PluginSet `json:"preFilter,omitempty"` + + // Filter is a list of plugins that should be invoked when filtering out nodes that cannot run the Pod. + Filter *PluginSet `json:"filter,omitempty"` + + // PostFilter is a list of plugins that are invoked after filtering out infeasible nodes. + PostFilter *PluginSet `json:"postFilter,omitempty"` + + // Score is a list of plugins that should be invoked when ranking nodes that have passed the filtering phase. + Score *PluginSet `json:"score,omitempty"` + + // NormalizeScore is a list of plugins that should be invoked after the scoring phase to normalize scores. + NormalizeScore *PluginSet `json:"normalizeScore,omitempty"` + + // Reserve is a list of plugins invoked when reserving a node to run the pod. + Reserve *PluginSet `json:"reserve,omitempty"` + + // Permit is a list of plugins that control binding of a Pod. These plugins can prevent or delay binding of a Pod. + Permit *PluginSet `json:"permit,omitempty"` + + // PreBind is a list of plugins that should be invoked before a pod is bound. + PreBind *PluginSet `json:"preBind,omitempty"` + + // Bind is a list of plugins that should be invoked at "Bind" extension point of the scheduling framework. + // The scheduler call these plugins in order. Scheduler skips the rest of these plugins as soon as one returns success. + Bind *PluginSet `json:"bind,omitempty"` + + // PostBind is a list of plugins that should be invoked after a pod is successfully bound. + PostBind *PluginSet `json:"postBind,omitempty"` + + // Unreserve is a list of plugins invoked when a pod that was previously reserved is rejected in a later phase. + Unreserve *PluginSet `json:"unreserve,omitempty"` +} + +// PluginSet specifies enabled and disabled plugins for an extension point. +// If an array is empty, missing, or nil, default plugins at that extension point will be used. +type PluginSet struct { + // Enabled specifies plugins that should be enabled in addition to default plugins. + // These are called after default plugins and in the same order specified here. + Enabled []Plugin `json:"enabled,omitempty"` + // Disabled specifies default plugins that should be disabled. + // When all default plugins need to be disabled, an array containing only one "*" should be provided. + Disabled []Plugin `json:"disabled,omitempty"` +} + +// Plugin specifies a plugin name and its weight when applicable. Weight is used only for Score plugins. +type Plugin struct { + // Name defines the name of plugin + Name string `json:"name"` + // Weight defines the weight of plugin, only used for Score plugins. + Weight int32 `json:"weight,omitempty"` +} + +// PluginConfig specifies arguments that should be passed to a plugin at the time of initialization. +// A plugin that is invoked at multiple extension points is initialized once. Args can have arbitrary structure. +// It is up to the plugin to process these Args. +type PluginConfig struct { + // Name defines the name of plugin being configured + Name string `json:"name"` + // Args defines the arguments passed to the plugins at the time of initialization. Args can have arbitrary structure. + Args runtime.Unknown `json:"args,omitempty"` +} diff --git a/staging/src/k8s.io/kube-scheduler/config/v1alpha1/zz_generated.deepcopy.go b/staging/src/k8s.io/kube-scheduler/config/v1alpha1/zz_generated.deepcopy.go index c0bbb79a4f0..1779f5c67b7 100644 --- a/staging/src/k8s.io/kube-scheduler/config/v1alpha1/zz_generated.deepcopy.go +++ b/staging/src/k8s.io/kube-scheduler/config/v1alpha1/zz_generated.deepcopy.go @@ -37,6 +37,18 @@ func (in *KubeSchedulerConfiguration) DeepCopyInto(out *KubeSchedulerConfigurati *out = new(int64) **out = **in } + if in.Plugins != nil { + in, out := &in.Plugins, &out.Plugins + *out = new(Plugins) + (*in).DeepCopyInto(*out) + } + if in.PluginConfig != nil { + in, out := &in.PluginConfig, &out.PluginConfig + *out = make([]PluginConfig, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } return } @@ -75,6 +87,141 @@ func (in *KubeSchedulerLeaderElectionConfiguration) DeepCopy() *KubeSchedulerLea return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Plugin) DeepCopyInto(out *Plugin) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Plugin. +func (in *Plugin) DeepCopy() *Plugin { + if in == nil { + return nil + } + out := new(Plugin) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PluginConfig) DeepCopyInto(out *PluginConfig) { + *out = *in + in.Args.DeepCopyInto(&out.Args) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PluginConfig. +func (in *PluginConfig) DeepCopy() *PluginConfig { + if in == nil { + return nil + } + out := new(PluginConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PluginSet) DeepCopyInto(out *PluginSet) { + *out = *in + if in.Enabled != nil { + in, out := &in.Enabled, &out.Enabled + *out = make([]Plugin, len(*in)) + copy(*out, *in) + } + if in.Disabled != nil { + in, out := &in.Disabled, &out.Disabled + *out = make([]Plugin, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PluginSet. +func (in *PluginSet) DeepCopy() *PluginSet { + if in == nil { + return nil + } + out := new(PluginSet) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Plugins) DeepCopyInto(out *Plugins) { + *out = *in + if in.QueueSort != nil { + in, out := &in.QueueSort, &out.QueueSort + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.PreFilter != nil { + in, out := &in.PreFilter, &out.PreFilter + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.Filter != nil { + in, out := &in.Filter, &out.Filter + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.PostFilter != nil { + in, out := &in.PostFilter, &out.PostFilter + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.Score != nil { + in, out := &in.Score, &out.Score + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.NormalizeScore != nil { + in, out := &in.NormalizeScore, &out.NormalizeScore + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.Reserve != nil { + in, out := &in.Reserve, &out.Reserve + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.Permit != nil { + in, out := &in.Permit, &out.Permit + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.PreBind != nil { + in, out := &in.PreBind, &out.PreBind + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.Bind != nil { + in, out := &in.Bind, &out.Bind + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.PostBind != nil { + in, out := &in.PostBind, &out.PostBind + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + if in.Unreserve != nil { + in, out := &in.Unreserve, &out.Unreserve + *out = new(PluginSet) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Plugins. +func (in *Plugins) DeepCopy() *Plugins { + if in == nil { + return nil + } + out := new(Plugins) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SchedulerAlgorithmSource) DeepCopyInto(out *SchedulerAlgorithmSource) { *out = *in diff --git a/test/integration/scheduler/BUILD b/test/integration/scheduler/BUILD index f93e6d83777..439f227fddb 100644 --- a/test/integration/scheduler/BUILD +++ b/test/integration/scheduler/BUILD @@ -95,6 +95,7 @@ go_library( "//pkg/scheduler:go_default_library", "//pkg/scheduler/algorithmprovider:go_default_library", "//pkg/scheduler/api:go_default_library", + "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/factory:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/util/taints:go_default_library", diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index 81e07cb4ac6..2f31bd7d627 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" + schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" ) @@ -201,10 +202,23 @@ func TestReservePlugin(t *testing.T) { // Create a plugin registry for testing. Register only a reserve plugin. registry := framework.Registry{reservePluginName: NewReservePlugin} + // Setup initial reserve plugin for testing. + reservePlugin := &schedulerconfig.Plugins{ + Reserve: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: reservePluginName, + }, + }, + }, + } + // Set empty plugin config for testing + emptyPluginConfig := []schedulerconfig.PluginConfig{} + // Create the master and the scheduler with the test plugin set. context := initTestSchedulerWithOptions(t, initTestMaster(t, "reserve-plugin", nil), - false, nil, registry, false, time.Second) + false, nil, registry, reservePlugin, emptyPluginConfig, false, time.Second) defer cleanupTest(t, context) cs := context.clientSet @@ -246,10 +260,27 @@ func TestPrebindPlugin(t *testing.T) { // Create a plugin registry for testing. Register only a prebind plugin. registry := framework.Registry{prebindPluginName: NewPrebindPlugin} + // Setup initial prebind plugin for testing. + preBindPlugin := &schedulerconfig.Plugins{ + PreBind: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: prebindPluginName, + }, + }, + }, + } + // Set reserve prebind config for testing + preBindPluginConfig := []schedulerconfig.PluginConfig{ + { + Name: prebindPluginName, + Args: runtime.Unknown{}, + }, + } // Create the master and the scheduler with the test plugin set. context := initTestSchedulerWithOptions(t, initTestMaster(t, "prebind-plugin", nil), - false, nil, registry, false, time.Second) + false, nil, registry, preBindPlugin, preBindPluginConfig, false, time.Second) defer cleanupTest(t, context) cs := context.clientSet @@ -323,10 +354,39 @@ func TestUnreservePlugin(t *testing.T) { prebindPluginName: NewPrebindPlugin, } + // Setup initial unreserve and prebind plugin for testing. + plugins := &schedulerconfig.Plugins{ + Unreserve: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: unreservePluginName, + }, + }, + }, + PreBind: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: prebindPluginName, + }, + }, + }, + } + // Set unreserve and prebind plugin config for testing + pluginConfig := []schedulerconfig.PluginConfig{ + { + Name: unreservePluginName, + Args: runtime.Unknown{}, + }, + { + Name: prebindPluginName, + Args: runtime.Unknown{}, + }, + } + // Create the master and the scheduler with the test plugin set. context := initTestSchedulerWithOptions(t, initTestMaster(t, "unreserve-plugin", nil), - false, nil, registry, false, time.Second) + false, nil, registry, plugins, pluginConfig, false, time.Second) defer cleanupTest(t, context) cs := context.clientSet @@ -404,10 +464,28 @@ func TestPermitPlugin(t *testing.T) { // Create a plugin registry for testing. Register only a permit plugin. registry := framework.Registry{permitPluginName: NewPermitPlugin} + // Setup initial permit plugin for testing. + plugins := &schedulerconfig.Plugins{ + Permit: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: permitPluginName, + }, + }, + }, + } + // Set permit plugin config for testing + pluginConfig := []schedulerconfig.PluginConfig{ + { + Name: permitPluginName, + Args: runtime.Unknown{}, + }, + } + // Create the master and the scheduler with the test plugin set. context := initTestSchedulerWithOptions(t, initTestMaster(t, "permit-plugin", nil), - false, nil, registry, false, time.Second) + false, nil, registry, plugins, pluginConfig, false, time.Second) defer cleanupTest(t, context) cs := context.clientSet @@ -495,10 +573,28 @@ func TestCoSchedulingWithPermitPlugin(t *testing.T) { // Create a plugin registry for testing. Register only a permit plugin. registry := framework.Registry{permitPluginName: NewPermitPlugin} + // Setup initial permit plugin for testing. + plugins := &schedulerconfig.Plugins{ + Permit: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: permitPluginName, + }, + }, + }, + } + // Set permit plugin config for testing + pluginConfig := []schedulerconfig.PluginConfig{ + { + Name: permitPluginName, + Args: runtime.Unknown{}, + }, + } + // Create the master and the scheduler with the test plugin set. context := initTestSchedulerWithOptions(t, initTestMaster(t, "permit-plugin", nil), - false, nil, registry, false, time.Second) + false, nil, registry, plugins, pluginConfig, false, time.Second) defer cleanupTest(t, context) cs := context.clientSet diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 038add1ed18..6f82103a237 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -265,6 +265,8 @@ priorities: [] }, nil, schedulerframework.NewRegistry(), + nil, + []kubeschedulerconfig.PluginConfig{}, scheduler.WithName(v1.DefaultSchedulerName), scheduler.WithHardPodAffinitySymmetricWeight(v1.DefaultHardPodAffinitySymmetricWeight), scheduler.WithBindTimeoutSeconds(defaultBindTimeout), @@ -334,6 +336,8 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) { }, nil, schedulerframework.NewRegistry(), + nil, + []kubeschedulerconfig.PluginConfig{}, scheduler.WithName(v1.DefaultSchedulerName), scheduler.WithHardPodAffinitySymmetricWeight(v1.DefaultHardPodAffinitySymmetricWeight), scheduler.WithBindTimeoutSeconds(defaultBindTimeout)) @@ -598,7 +602,8 @@ func TestMultiScheduler(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - schedulerConfigFactory2 := createConfiguratorWithPodInformer(fooScheduler, clientSet2, podInformer2, informerFactory2, schedulerframework.NewRegistry(), stopCh) + schedulerConfigFactory2 := createConfiguratorWithPodInformer(fooScheduler, clientSet2, podInformer2, informerFactory2, schedulerframework.NewRegistry(), + nil, []kubeschedulerconfig.PluginConfig{}, stopCh) schedulerConfig2, err := schedulerConfigFactory2.Create() if err != nil { t.Errorf("Couldn't create scheduler config: %v", err) diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index c33f7f180b6..9ba8d0b8f1e 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -45,6 +45,8 @@ import ( "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/disruption" "k8s.io/kubernetes/pkg/scheduler" + schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" + // Register defaults in pkg/scheduler/algorithmprovider. _ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" @@ -74,6 +76,8 @@ func createConfiguratorWithPodInformer( podInformer coreinformers.PodInformer, informerFactory informers.SharedInformerFactory, pluginRegistry schedulerframework.Registry, + plugins *schedulerconfig.Plugins, + pluginConfig []schedulerconfig.PluginConfig, stopCh <-chan struct{}, ) factory.Configurator { return factory.NewConfigFactory(&factory.ConfigFactoryArgs{ @@ -90,6 +94,8 @@ func createConfiguratorWithPodInformer( PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(), StorageClassInformer: informerFactory.Storage().V1().StorageClasses(), Registry: pluginRegistry, + Plugins: plugins, + PluginConfig: pluginConfig, HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight, DisablePreemption: false, PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, @@ -148,7 +154,8 @@ func initTestScheduler( ) *testContext { // Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority // feature gate is enabled at the same time. - return initTestSchedulerWithOptions(t, context, setPodInformer, policy, schedulerframework.NewRegistry(), false, time.Second) + return initTestSchedulerWithOptions(t, context, setPodInformer, policy, schedulerframework.NewRegistry(), + nil, []schedulerconfig.PluginConfig{}, false, time.Second) } // initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default @@ -159,6 +166,8 @@ func initTestSchedulerWithOptions( setPodInformer bool, policy *schedulerapi.Policy, pluginRegistry schedulerframework.Registry, + plugins *schedulerconfig.Plugins, + pluginConfig []schedulerconfig.PluginConfig, disablePreemption bool, resyncPeriod time.Duration, ) *testContext { @@ -175,7 +184,8 @@ func initTestSchedulerWithOptions( } context.schedulerConfigFactory = createConfiguratorWithPodInformer( - v1.DefaultSchedulerName, context.clientSet, podInformer, context.informerFactory, pluginRegistry, context.stopCh) + v1.DefaultSchedulerName, context.clientSet, podInformer, context.informerFactory, pluginRegistry, plugins, + pluginConfig, context.stopCh) var err error @@ -257,7 +267,8 @@ func initTest(t *testing.T, nsPrefix string) *testContext { func initTestDisablePreemption(t *testing.T, nsPrefix string) *testContext { return initTestSchedulerWithOptions( t, initTestMaster(t, nsPrefix, nil), true, nil, - schedulerframework.NewRegistry(), true, time.Second) + schedulerframework.NewRegistry(), nil, []schedulerconfig.PluginConfig{}, + true, time.Second) } // cleanupTest deletes the scheduler and the test namespace. It should be called diff --git a/test/integration/scheduler/volume_binding_test.go b/test/integration/scheduler/volume_binding_test.go index da53eecf00a..f1ecdc90b3c 100644 --- a/test/integration/scheduler/volume_binding_test.go +++ b/test/integration/scheduler/volume_binding_test.go @@ -43,6 +43,7 @@ import ( persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" + schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/volume" volumetest "k8s.io/kubernetes/pkg/volume/testing" imageutils "k8s.io/kubernetes/test/utils/image" @@ -882,7 +883,8 @@ func TestRescheduleProvisioning(t *testing.T) { } func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod time.Duration, provisionDelaySeconds int) *testConfig { - context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), false, nil, nil, false, resyncPeriod) + context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), false, nil, nil, + nil, []schedulerconfig.PluginConfig{}, false, resyncPeriod) clientset := context.clientSet ns := context.ns.Name