diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 2339c1755df..306571053a3 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -187,6 +187,7 @@ func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTre scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry), scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds), scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds), + scheduler.WithExtenders(cc.ComponentConfig.Extenders...), ) if err != nil { return err diff --git a/pkg/scheduler/apis/config/types.go b/pkg/scheduler/apis/config/types.go index d781539dd91..a21d16fd2ff 100644 --- a/pkg/scheduler/apis/config/types.go +++ b/pkg/scheduler/apis/config/types.go @@ -101,6 +101,10 @@ type KubeSchedulerConfiguration struct { // scheduler name. Pods that don't specify any scheduler name are scheduled // with the "default-scheduler" profile, if present here. Profiles []KubeSchedulerProfile + + // Extenders are the list of scheduler extenders, each holding the values of how to communicate + // with the extender. These extenders are shared by all scheduler profiles. + Extenders []Extender } // KubeSchedulerProfile is a scheduling profile. diff --git a/pkg/scheduler/apis/config/v1alpha1/zz_generated.conversion.go b/pkg/scheduler/apis/config/v1alpha1/zz_generated.conversion.go index fe1595e11ae..1bd3b1a3efb 100644 --- a/pkg/scheduler/apis/config/v1alpha1/zz_generated.conversion.go +++ b/pkg/scheduler/apis/config/v1alpha1/zz_generated.conversion.go @@ -217,6 +217,7 @@ func autoConvert_config_KubeSchedulerConfiguration_To_v1alpha1_KubeSchedulerConf return err } // WARNING: in.Profiles requires manual conversion: does not exist in peer-type + // WARNING: in.Extenders requires manual conversion: does not exist in peer-type return nil } diff --git a/pkg/scheduler/apis/config/v1alpha2/BUILD b/pkg/scheduler/apis/config/v1alpha2/BUILD index 00126accdd3..c58b6c35ca6 100644 --- a/pkg/scheduler/apis/config/v1alpha2/BUILD +++ b/pkg/scheduler/apis/config/v1alpha2/BUILD @@ -21,6 +21,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/conversion:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/component-base/config/v1alpha1:go_default_library", + "//staging/src/k8s.io/kube-scheduler/config/v1:go_default_library", "//staging/src/k8s.io/kube-scheduler/config/v1alpha2:go_default_library", "//vendor/k8s.io/utils/pointer:go_default_library", ], diff --git a/pkg/scheduler/apis/config/v1alpha2/zz_generated.conversion.go b/pkg/scheduler/apis/config/v1alpha2/zz_generated.conversion.go index 383efda967b..d5c4ef58133 100644 --- a/pkg/scheduler/apis/config/v1alpha2/zz_generated.conversion.go +++ b/pkg/scheduler/apis/config/v1alpha2/zz_generated.conversion.go @@ -27,6 +27,7 @@ import ( conversion "k8s.io/apimachinery/pkg/conversion" runtime "k8s.io/apimachinery/pkg/runtime" v1alpha1 "k8s.io/component-base/config/v1alpha1" + configv1 "k8s.io/kube-scheduler/config/v1" v1alpha2 "k8s.io/kube-scheduler/config/v1alpha2" config "k8s.io/kubernetes/pkg/scheduler/apis/config" ) @@ -153,6 +154,7 @@ func autoConvert_v1alpha2_KubeSchedulerConfiguration_To_config_KubeSchedulerConf } else { out.Profiles = nil } + out.Extenders = *(*[]config.Extender)(unsafe.Pointer(&in.Extenders)) return nil } @@ -199,6 +201,7 @@ func autoConvert_config_KubeSchedulerConfiguration_To_v1alpha2_KubeSchedulerConf } else { out.Profiles = nil } + out.Extenders = *(*[]configv1.Extender)(unsafe.Pointer(&in.Extenders)) return nil } diff --git a/pkg/scheduler/apis/config/validation/validation.go b/pkg/scheduler/apis/config/validation/validation.go index c5f4f224a0b..35107339307 100644 --- a/pkg/scheduler/apis/config/validation/validation.go +++ b/pkg/scheduler/apis/config/validation/validation.go @@ -71,6 +71,8 @@ func ValidateKubeSchedulerConfiguration(cc *config.KubeSchedulerConfiguration) f allErrs = append(allErrs, field.Invalid(field.NewPath("podMaxBackoffSeconds"), cc.PodMaxBackoffSeconds, "must be greater than or equal to PodInitialBackoffSeconds")) } + + allErrs = append(allErrs, validateExtenders(field.NewPath("extenders"), cc.Extenders)...) return allErrs } @@ -123,28 +125,8 @@ func ValidatePolicy(policy config.Policy) error { validationErrors = append(validationErrors, validateCustomPriorities(priorities, priority)) } - binders := 0 - extenderManagedResources := sets.NewString() - for _, extender := range policy.Extenders { - if len(extender.PrioritizeVerb) > 0 && extender.Weight <= 0 { - validationErrors = append(validationErrors, fmt.Errorf("Priority for extender %s should have a positive weight applied to it", extender.URLPrefix)) - } - if extender.BindVerb != "" { - binders++ - } - for _, resource := range extender.ManagedResources { - errs := validateExtendedResourceName(v1.ResourceName(resource.Name)) - if len(errs) != 0 { - validationErrors = append(validationErrors, errs...) - } - if extenderManagedResources.Has(resource.Name) { - validationErrors = append(validationErrors, fmt.Errorf("Duplicate extender managed resource name %s", string(resource.Name))) - } - extenderManagedResources.Insert(resource.Name) - } - } - if binders > 1 { - validationErrors = append(validationErrors, fmt.Errorf("Only one extender can implement bind, found %v", binders)) + if extenderErrs := validateExtenders(field.NewPath("extenders"), policy.Extenders); len(extenderErrs) > 0 { + validationErrors = append(validationErrors, extenderErrs.ToAggregate().Errors()...) } if policy.HardPodAffinitySymmetricWeight < 0 || policy.HardPodAffinitySymmetricWeight > 100 { @@ -153,6 +135,40 @@ func ValidatePolicy(policy config.Policy) error { return utilerrors.NewAggregate(validationErrors) } +// validateExtenders validates the configured extenders for the Scheduler +func validateExtenders(fldPath *field.Path, extenders []config.Extender) field.ErrorList { + allErrs := field.ErrorList{} + binders := 0 + extenderManagedResources := sets.NewString() + for i, extender := range extenders { + path := fldPath.Index(i) + if len(extender.PrioritizeVerb) > 0 && extender.Weight <= 0 { + allErrs = append(allErrs, field.Invalid(path.Child("weight"), + extender.Weight, "must have a positive weight applied to it")) + } + if extender.BindVerb != "" { + binders++ + } + for j, resource := range extender.ManagedResources { + managedResourcesPath := path.Child("managedResources").Index(j) + errs := validateExtendedResourceName(v1.ResourceName(resource.Name)) + for _, err := range errs { + allErrs = append(allErrs, field.Invalid(managedResourcesPath.Child("name"), + resource.Name, fmt.Sprintf("%+v", err))) + } + if extenderManagedResources.Has(resource.Name) { + allErrs = append(allErrs, field.Invalid(managedResourcesPath.Child("name"), + resource.Name, "duplicate extender managed resource name")) + } + extenderManagedResources.Insert(resource.Name) + } + } + if binders > 1 { + allErrs = append(allErrs, field.Invalid(fldPath, fmt.Sprintf("found %d extenders implementing bind", binders), "only one extender can implement bind")) + } + return allErrs +} + // validateCustomPriorities validates that: // 1. RequestedToCapacityRatioRedeclared custom priority cannot be declared multiple times, // 2. LabelPreference/ServiceAntiAffinity custom priorities can be declared multiple times, diff --git a/pkg/scheduler/apis/config/validation/validation_test.go b/pkg/scheduler/apis/config/validation/validation_test.go index fff4c98891a..afa496d831e 100644 --- a/pkg/scheduler/apis/config/validation/validation_test.go +++ b/pkg/scheduler/apis/config/validation/validation_test.go @@ -87,6 +87,12 @@ func TestValidateKubeSchedulerConfiguration(t *testing.T) { }, }, }, + Extenders: []config.Extender{ + { + PrioritizeVerb: "prioritize", + Weight: 1, + }, + }, } resourceNameNotSet := validConfig.DeepCopy() @@ -126,6 +132,22 @@ func TestValidateKubeSchedulerConfiguration(t *testing.T) { oneEmptyQueueSort := validConfig.DeepCopy() oneEmptyQueueSort.Profiles[0].Plugins = nil + extenderNegativeWeight := validConfig.DeepCopy() + extenderNegativeWeight.Extenders[0].Weight = -1 + + extenderDuplicateManagedResource := validConfig.DeepCopy() + extenderDuplicateManagedResource.Extenders[0].ManagedResources = []config.ExtenderManagedResource{ + {Name: "foo", IgnoredByScheduler: false}, + {Name: "foo", IgnoredByScheduler: false}, + } + + extenderDuplicateBind := validConfig.DeepCopy() + extenderDuplicateBind.Extenders[0].BindVerb = "foo" + extenderDuplicateBind.Extenders = append(extenderDuplicateBind.Extenders, config.Extender{ + PrioritizeVerb: "prioritize", + BindVerb: "bar", + }) + scenarios := map[string]struct { expectedToFail bool config *config.KubeSchedulerConfiguration @@ -178,6 +200,18 @@ func TestValidateKubeSchedulerConfiguration(t *testing.T) { expectedToFail: true, config: oneEmptyQueueSort, }, + "extender-negative-weight": { + expectedToFail: true, + config: extenderNegativeWeight, + }, + "extender-duplicate-managed-resources": { + expectedToFail: true, + config: extenderDuplicateManagedResource, + }, + "extender-duplicate-bind": { + expectedToFail: true, + config: extenderDuplicateBind, + }, } for name, scenario := range scenarios { @@ -232,7 +266,7 @@ func TestValidatePolicy(t *testing.T) { { name: "invalid negative weight in policy extender config", policy: config.Policy{Extenders: []config.Extender{{URLPrefix: "http://127.0.0.1:8081/extender", PrioritizeVerb: "prioritize", Weight: -2}}}, - expected: errors.New("Priority for extender http://127.0.0.1:8081/extender should have a positive weight applied to it"), + expected: errors.New("extenders[0].weight: Invalid value: -2: must have a positive weight applied to it"), }, { name: "valid filter verb and url prefix", @@ -251,7 +285,7 @@ func TestValidatePolicy(t *testing.T) { {URLPrefix: "http://127.0.0.1:8081/extender", BindVerb: "bind"}, {URLPrefix: "http://127.0.0.1:8082/extender", BindVerb: "bind"}, }}, - expected: errors.New("Only one extender can implement bind, found 2"), + expected: errors.New("extenders: Invalid value: \"found 2 extenders implementing bind\": only one extender can implement bind"), }, { name: "invalid duplicate extender resource name", @@ -260,7 +294,7 @@ func TestValidatePolicy(t *testing.T) { {URLPrefix: "http://127.0.0.1:8081/extender", ManagedResources: []config.ExtenderManagedResource{{Name: "foo.com/bar"}}}, {URLPrefix: "http://127.0.0.1:8082/extender", BindVerb: "bind", ManagedResources: []config.ExtenderManagedResource{{Name: "foo.com/bar"}}}, }}, - expected: errors.New("Duplicate extender managed resource name foo.com/bar"), + expected: errors.New("extenders[1].managedResources[0].name: Invalid value: \"foo.com/bar\": duplicate extender managed resource name"), }, { name: "invalid extended resource name", @@ -268,7 +302,7 @@ func TestValidatePolicy(t *testing.T) { Extenders: []config.Extender{ {URLPrefix: "http://127.0.0.1:8081/extender", ManagedResources: []config.ExtenderManagedResource{{Name: "kubernetes.io/foo"}}}, }}, - expected: errors.New("kubernetes.io/foo is an invalid extended resource name"), + expected: errors.New("extenders[0].managedResources[0].name: Invalid value: \"kubernetes.io/foo\": kubernetes.io/foo is an invalid extended resource name"), }, { name: "invalid redeclared RequestedToCapacityRatio custom priority", diff --git a/pkg/scheduler/apis/config/zz_generated.deepcopy.go b/pkg/scheduler/apis/config/zz_generated.deepcopy.go index e6d8e104e71..4b7c92369ca 100644 --- a/pkg/scheduler/apis/config/zz_generated.deepcopy.go +++ b/pkg/scheduler/apis/config/zz_generated.deepcopy.go @@ -112,6 +112,13 @@ func (in *KubeSchedulerConfiguration) DeepCopyInto(out *KubeSchedulerConfigurati (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Extenders != nil { + in, out := &in.Extenders, &out.Extenders + *out = make([]Extender, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } return } diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index eb99eacbec7..31370232676 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -105,6 +105,7 @@ type Configurator struct { profiles []schedulerapi.KubeSchedulerProfile registry framework.Registry nodeInfoSnapshot *internalcache.Snapshot + extenders []schedulerapi.Extender } func (c *Configurator) buildFramework(p schedulerapi.KubeSchedulerProfile) (framework.Framework, error) { @@ -121,7 +122,49 @@ func (c *Configurator) buildFramework(p schedulerapi.KubeSchedulerProfile) (fram } // create a scheduler from a set of registered plugins. -func (c *Configurator) create(extenders []core.SchedulerExtender) (*Scheduler, error) { +func (c *Configurator) create() (*Scheduler, error) { + var extenders []core.SchedulerExtender + var ignoredExtendedResources []string + if len(c.extenders) != 0 { + var ignorableExtenders []core.SchedulerExtender + for ii := range c.extenders { + klog.V(2).Infof("Creating extender with config %+v", c.extenders[ii]) + extender, err := core.NewHTTPExtender(&c.extenders[ii]) + if err != nil { + return nil, err + } + if !extender.IsIgnorable() { + extenders = append(extenders, extender) + } else { + ignorableExtenders = append(ignorableExtenders, extender) + } + for _, r := range c.extenders[ii].ManagedResources { + if r.IgnoredByScheduler { + ignoredExtendedResources = append(ignoredExtendedResources, r.Name) + } + } + } + // place ignorable extenders to the tail of extenders + extenders = append(extenders, ignorableExtenders...) + } + + // If there are any extended resources found from the Extenders, append them to the pluginConfig for each profile. + // This should only have an effect on ComponentConfig v1alpha2, where it is possible to configure Extenders and + // plugin args (and in which case the extender ignored resources take precedence). + // For earlier versions, using both policy and custom plugin config is disallowed, so this should be the only + // plugin config for this plugin. + if len(ignoredExtendedResources) > 0 { + for i := range c.profiles { + prof := &c.profiles[i] + prof.PluginConfig = append(prof.PluginConfig, + frameworkplugins.NewPluginConfig( + noderesources.FitName, + noderesources.FitArgs{IgnoredResources: ignoredExtendedResources}, + ), + ) + } + } + profiles, err := profile.NewMap(c.profiles, c.buildFramework, c.recorderFactory) if err != nil { return nil, fmt.Errorf("initializing profiles: %v", err) @@ -186,11 +229,11 @@ func (c *Configurator) createFromProvider(providerName string) (*Scheduler, erro plugins.Apply(prof.Plugins) prof.Plugins = plugins } - - return c.create([]core.SchedulerExtender{}) + return c.create() } // createFromConfig creates a scheduler from the configuration file +// Only reachable when using v1alpha1 component config func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler, error) { lr := frameworkplugins.NewLegacyRegistry() args := &frameworkplugins.ConfigProducerArgs{} @@ -228,33 +271,6 @@ func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler, } } - var extenders []core.SchedulerExtender - if len(policy.Extenders) != 0 { - var ignorableExtenders []core.SchedulerExtender - var ignoredExtendedResources []string - for ii := range policy.Extenders { - klog.V(2).Infof("Creating extender with config %+v", policy.Extenders[ii]) - extender, err := core.NewHTTPExtender(&policy.Extenders[ii]) - if err != nil { - return nil, err - } - if !extender.IsIgnorable() { - extenders = append(extenders, extender) - } else { - ignorableExtenders = append(ignorableExtenders, extender) - } - for _, r := range policy.Extenders[ii].ManagedResources { - if r.IgnoredByScheduler { - ignoredExtendedResources = append(ignoredExtendedResources, r.Name) - } - } - } - args.NodeResourcesFitArgs = &noderesources.FitArgs{ - IgnoredResources: ignoredExtendedResources, - } - // place ignorable extenders to the tail of extenders - extenders = append(extenders, ignorableExtenders...) - } // HardPodAffinitySymmetricWeight in the policy config takes precedence over // CLI configuration. if policy.HardPodAffinitySymmetricWeight != 0 { @@ -312,7 +328,7 @@ func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler, prof.PluginConfig = pluginConfig } - return c.create(extenders) + return c.create() } // getPriorityConfigs returns priorities configuration: ones that will run as priorities and ones that will run diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index bc5792aaaa6..9a35a05b8fc 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -133,6 +133,7 @@ type schedulerOptions struct { // Contains out-of-tree plugins to be merged with the in-tree registry. frameworkOutOfTreeRegistry framework.Registry profiles []schedulerapi.KubeSchedulerProfile + extenders []schedulerapi.Extender } // Option configures a Scheduler @@ -196,6 +197,13 @@ func WithPodMaxBackoffSeconds(podMaxBackoffSeconds int64) Option { } } +// WithExtenders sets extenders for the Scheduler +func WithExtenders(e ...schedulerapi.Extender) Option { + return func(o *schedulerOptions) { + o.extenders = e + } +} + var defaultSchedulerOptions = schedulerOptions{ profiles: []schedulerapi.KubeSchedulerProfile{ // Profiles' default plugins are set from the algorithm provider. @@ -264,6 +272,7 @@ func New(client clientset.Interface, profiles: append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...), registry: registry, nodeInfoSnapshot: snapshot, + extenders: options.extenders, } metrics.Register() @@ -291,6 +300,10 @@ func New(client clientset.Interface, return nil, err } } + // Set extenders on the configurator now that we've decoded the policy + // In this case, c.extenders should be nil since we're using a policy (and therefore not componentconfig, + // which would have set extenders in the above instantiation of Configurator from CC options) + configurator.extenders = policy.Extenders sc, err := configurator.createFromConfig(*policy) if err != nil { return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err) diff --git a/staging/publishing/import-restrictions.yaml b/staging/publishing/import-restrictions.yaml index 4422f5f06bb..b5aa157c428 100644 --- a/staging/publishing/import-restrictions.yaml +++ b/staging/publishing/import-restrictions.yaml @@ -198,6 +198,7 @@ - k8s.io/apimachinery - k8s.io/component-base - k8s.io/klog + - k8s.io/kube-scheduler - k8s.io/utils - baseImportPath: "./vendor/k8s.io/kubelet/" diff --git a/staging/src/k8s.io/kube-scheduler/config/v1alpha2/BUILD b/staging/src/k8s.io/kube-scheduler/config/v1alpha2/BUILD index a456e558eaa..40f30aec12a 100644 --- a/staging/src/k8s.io/kube-scheduler/config/v1alpha2/BUILD +++ b/staging/src/k8s.io/kube-scheduler/config/v1alpha2/BUILD @@ -16,6 +16,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/component-base/config/v1alpha1:go_default_library", + "//staging/src/k8s.io/kube-scheduler/config/v1:go_default_library", ], ) diff --git a/staging/src/k8s.io/kube-scheduler/config/v1alpha2/types.go b/staging/src/k8s.io/kube-scheduler/config/v1alpha2/types.go index 686c44aba1b..2474b716ad1 100644 --- a/staging/src/k8s.io/kube-scheduler/config/v1alpha2/types.go +++ b/staging/src/k8s.io/kube-scheduler/config/v1alpha2/types.go @@ -20,6 +20,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" componentbaseconfigv1alpha1 "k8s.io/component-base/config/v1alpha1" + v1 "k8s.io/kube-scheduler/config/v1" ) const ( @@ -91,6 +92,11 @@ type KubeSchedulerConfiguration struct { // +listType=map // +listMapKey=schedulerName Profiles []KubeSchedulerProfile `json:"profiles"` + + // Extenders are the list of scheduler extenders, each holding the values of how to communicate + // with the extender. These extenders are shared by all scheduler profiles. + // +listType=set + Extenders []v1.Extender `json:"extenders"` } // KubeSchedulerProfile is a scheduling profile. diff --git a/staging/src/k8s.io/kube-scheduler/config/v1alpha2/zz_generated.deepcopy.go b/staging/src/k8s.io/kube-scheduler/config/v1alpha2/zz_generated.deepcopy.go index feab03c2ffe..afd67b9d390 100644 --- a/staging/src/k8s.io/kube-scheduler/config/v1alpha2/zz_generated.deepcopy.go +++ b/staging/src/k8s.io/kube-scheduler/config/v1alpha2/zz_generated.deepcopy.go @@ -22,6 +22,7 @@ package v1alpha2 import ( runtime "k8s.io/apimachinery/pkg/runtime" + v1 "k8s.io/kube-scheduler/config/v1" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. @@ -73,6 +74,13 @@ func (in *KubeSchedulerConfiguration) DeepCopyInto(out *KubeSchedulerConfigurati (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Extenders != nil { + in, out := &in.Extenders, &out.Extenders + *out = make([]v1.Extender, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } return }