Add Extenders to scheduler v1alpha2 component config

This commit is contained in:
Mike Dame 2020-03-05 09:35:16 -05:00
parent 06b798781a
commit 1d7006c38d
14 changed files with 169 additions and 57 deletions

View File

@ -187,6 +187,7 @@ func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTre
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
)
if err != nil {
return err

View File

@ -101,6 +101,10 @@ type KubeSchedulerConfiguration struct {
// scheduler name. Pods that don't specify any scheduler name are scheduled
// with the "default-scheduler" profile, if present here.
Profiles []KubeSchedulerProfile
// Extenders are the list of scheduler extenders, each holding the values of how to communicate
// with the extender. These extenders are shared by all scheduler profiles.
Extenders []Extender
}
// KubeSchedulerProfile is a scheduling profile.

View File

@ -217,6 +217,7 @@ func autoConvert_config_KubeSchedulerConfiguration_To_v1alpha1_KubeSchedulerConf
return err
}
// WARNING: in.Profiles requires manual conversion: does not exist in peer-type
// WARNING: in.Extenders requires manual conversion: does not exist in peer-type
return nil
}

View File

@ -21,6 +21,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/conversion:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/component-base/config/v1alpha1:go_default_library",
"//staging/src/k8s.io/kube-scheduler/config/v1:go_default_library",
"//staging/src/k8s.io/kube-scheduler/config/v1alpha2:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],

View File

@ -27,6 +27,7 @@ import (
conversion "k8s.io/apimachinery/pkg/conversion"
runtime "k8s.io/apimachinery/pkg/runtime"
v1alpha1 "k8s.io/component-base/config/v1alpha1"
configv1 "k8s.io/kube-scheduler/config/v1"
v1alpha2 "k8s.io/kube-scheduler/config/v1alpha2"
config "k8s.io/kubernetes/pkg/scheduler/apis/config"
)
@ -153,6 +154,7 @@ func autoConvert_v1alpha2_KubeSchedulerConfiguration_To_config_KubeSchedulerConf
} else {
out.Profiles = nil
}
out.Extenders = *(*[]config.Extender)(unsafe.Pointer(&in.Extenders))
return nil
}
@ -199,6 +201,7 @@ func autoConvert_config_KubeSchedulerConfiguration_To_v1alpha2_KubeSchedulerConf
} else {
out.Profiles = nil
}
out.Extenders = *(*[]configv1.Extender)(unsafe.Pointer(&in.Extenders))
return nil
}

View File

@ -71,6 +71,8 @@ func ValidateKubeSchedulerConfiguration(cc *config.KubeSchedulerConfiguration) f
allErrs = append(allErrs, field.Invalid(field.NewPath("podMaxBackoffSeconds"),
cc.PodMaxBackoffSeconds, "must be greater than or equal to PodInitialBackoffSeconds"))
}
allErrs = append(allErrs, validateExtenders(field.NewPath("extenders"), cc.Extenders)...)
return allErrs
}
@ -123,28 +125,8 @@ func ValidatePolicy(policy config.Policy) error {
validationErrors = append(validationErrors, validateCustomPriorities(priorities, priority))
}
binders := 0
extenderManagedResources := sets.NewString()
for _, extender := range policy.Extenders {
if len(extender.PrioritizeVerb) > 0 && extender.Weight <= 0 {
validationErrors = append(validationErrors, fmt.Errorf("Priority for extender %s should have a positive weight applied to it", extender.URLPrefix))
}
if extender.BindVerb != "" {
binders++
}
for _, resource := range extender.ManagedResources {
errs := validateExtendedResourceName(v1.ResourceName(resource.Name))
if len(errs) != 0 {
validationErrors = append(validationErrors, errs...)
}
if extenderManagedResources.Has(resource.Name) {
validationErrors = append(validationErrors, fmt.Errorf("Duplicate extender managed resource name %s", string(resource.Name)))
}
extenderManagedResources.Insert(resource.Name)
}
}
if binders > 1 {
validationErrors = append(validationErrors, fmt.Errorf("Only one extender can implement bind, found %v", binders))
if extenderErrs := validateExtenders(field.NewPath("extenders"), policy.Extenders); len(extenderErrs) > 0 {
validationErrors = append(validationErrors, extenderErrs.ToAggregate().Errors()...)
}
if policy.HardPodAffinitySymmetricWeight < 0 || policy.HardPodAffinitySymmetricWeight > 100 {
@ -153,6 +135,40 @@ func ValidatePolicy(policy config.Policy) error {
return utilerrors.NewAggregate(validationErrors)
}
// validateExtenders validates the configured extenders for the Scheduler
func validateExtenders(fldPath *field.Path, extenders []config.Extender) field.ErrorList {
allErrs := field.ErrorList{}
binders := 0
extenderManagedResources := sets.NewString()
for i, extender := range extenders {
path := fldPath.Index(i)
if len(extender.PrioritizeVerb) > 0 && extender.Weight <= 0 {
allErrs = append(allErrs, field.Invalid(path.Child("weight"),
extender.Weight, "must have a positive weight applied to it"))
}
if extender.BindVerb != "" {
binders++
}
for j, resource := range extender.ManagedResources {
managedResourcesPath := path.Child("managedResources").Index(j)
errs := validateExtendedResourceName(v1.ResourceName(resource.Name))
for _, err := range errs {
allErrs = append(allErrs, field.Invalid(managedResourcesPath.Child("name"),
resource.Name, fmt.Sprintf("%+v", err)))
}
if extenderManagedResources.Has(resource.Name) {
allErrs = append(allErrs, field.Invalid(managedResourcesPath.Child("name"),
resource.Name, "duplicate extender managed resource name"))
}
extenderManagedResources.Insert(resource.Name)
}
}
if binders > 1 {
allErrs = append(allErrs, field.Invalid(fldPath, fmt.Sprintf("found %d extenders implementing bind", binders), "only one extender can implement bind"))
}
return allErrs
}
// validateCustomPriorities validates that:
// 1. RequestedToCapacityRatioRedeclared custom priority cannot be declared multiple times,
// 2. LabelPreference/ServiceAntiAffinity custom priorities can be declared multiple times,

View File

@ -87,6 +87,12 @@ func TestValidateKubeSchedulerConfiguration(t *testing.T) {
},
},
},
Extenders: []config.Extender{
{
PrioritizeVerb: "prioritize",
Weight: 1,
},
},
}
resourceNameNotSet := validConfig.DeepCopy()
@ -126,6 +132,22 @@ func TestValidateKubeSchedulerConfiguration(t *testing.T) {
oneEmptyQueueSort := validConfig.DeepCopy()
oneEmptyQueueSort.Profiles[0].Plugins = nil
extenderNegativeWeight := validConfig.DeepCopy()
extenderNegativeWeight.Extenders[0].Weight = -1
extenderDuplicateManagedResource := validConfig.DeepCopy()
extenderDuplicateManagedResource.Extenders[0].ManagedResources = []config.ExtenderManagedResource{
{Name: "foo", IgnoredByScheduler: false},
{Name: "foo", IgnoredByScheduler: false},
}
extenderDuplicateBind := validConfig.DeepCopy()
extenderDuplicateBind.Extenders[0].BindVerb = "foo"
extenderDuplicateBind.Extenders = append(extenderDuplicateBind.Extenders, config.Extender{
PrioritizeVerb: "prioritize",
BindVerb: "bar",
})
scenarios := map[string]struct {
expectedToFail bool
config *config.KubeSchedulerConfiguration
@ -178,6 +200,18 @@ func TestValidateKubeSchedulerConfiguration(t *testing.T) {
expectedToFail: true,
config: oneEmptyQueueSort,
},
"extender-negative-weight": {
expectedToFail: true,
config: extenderNegativeWeight,
},
"extender-duplicate-managed-resources": {
expectedToFail: true,
config: extenderDuplicateManagedResource,
},
"extender-duplicate-bind": {
expectedToFail: true,
config: extenderDuplicateBind,
},
}
for name, scenario := range scenarios {
@ -232,7 +266,7 @@ func TestValidatePolicy(t *testing.T) {
{
name: "invalid negative weight in policy extender config",
policy: config.Policy{Extenders: []config.Extender{{URLPrefix: "http://127.0.0.1:8081/extender", PrioritizeVerb: "prioritize", Weight: -2}}},
expected: errors.New("Priority for extender http://127.0.0.1:8081/extender should have a positive weight applied to it"),
expected: errors.New("extenders[0].weight: Invalid value: -2: must have a positive weight applied to it"),
},
{
name: "valid filter verb and url prefix",
@ -251,7 +285,7 @@ func TestValidatePolicy(t *testing.T) {
{URLPrefix: "http://127.0.0.1:8081/extender", BindVerb: "bind"},
{URLPrefix: "http://127.0.0.1:8082/extender", BindVerb: "bind"},
}},
expected: errors.New("Only one extender can implement bind, found 2"),
expected: errors.New("extenders: Invalid value: \"found 2 extenders implementing bind\": only one extender can implement bind"),
},
{
name: "invalid duplicate extender resource name",
@ -260,7 +294,7 @@ func TestValidatePolicy(t *testing.T) {
{URLPrefix: "http://127.0.0.1:8081/extender", ManagedResources: []config.ExtenderManagedResource{{Name: "foo.com/bar"}}},
{URLPrefix: "http://127.0.0.1:8082/extender", BindVerb: "bind", ManagedResources: []config.ExtenderManagedResource{{Name: "foo.com/bar"}}},
}},
expected: errors.New("Duplicate extender managed resource name foo.com/bar"),
expected: errors.New("extenders[1].managedResources[0].name: Invalid value: \"foo.com/bar\": duplicate extender managed resource name"),
},
{
name: "invalid extended resource name",
@ -268,7 +302,7 @@ func TestValidatePolicy(t *testing.T) {
Extenders: []config.Extender{
{URLPrefix: "http://127.0.0.1:8081/extender", ManagedResources: []config.ExtenderManagedResource{{Name: "kubernetes.io/foo"}}},
}},
expected: errors.New("kubernetes.io/foo is an invalid extended resource name"),
expected: errors.New("extenders[0].managedResources[0].name: Invalid value: \"kubernetes.io/foo\": kubernetes.io/foo is an invalid extended resource name"),
},
{
name: "invalid redeclared RequestedToCapacityRatio custom priority",

View File

@ -112,6 +112,13 @@ func (in *KubeSchedulerConfiguration) DeepCopyInto(out *KubeSchedulerConfigurati
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.Extenders != nil {
in, out := &in.Extenders, &out.Extenders
*out = make([]Extender, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}

View File

@ -105,6 +105,7 @@ type Configurator struct {
profiles []schedulerapi.KubeSchedulerProfile
registry framework.Registry
nodeInfoSnapshot *internalcache.Snapshot
extenders []schedulerapi.Extender
}
func (c *Configurator) buildFramework(p schedulerapi.KubeSchedulerProfile) (framework.Framework, error) {
@ -121,7 +122,49 @@ func (c *Configurator) buildFramework(p schedulerapi.KubeSchedulerProfile) (fram
}
// create a scheduler from a set of registered plugins.
func (c *Configurator) create(extenders []core.SchedulerExtender) (*Scheduler, error) {
func (c *Configurator) create() (*Scheduler, error) {
var extenders []core.SchedulerExtender
var ignoredExtendedResources []string
if len(c.extenders) != 0 {
var ignorableExtenders []core.SchedulerExtender
for ii := range c.extenders {
klog.V(2).Infof("Creating extender with config %+v", c.extenders[ii])
extender, err := core.NewHTTPExtender(&c.extenders[ii])
if err != nil {
return nil, err
}
if !extender.IsIgnorable() {
extenders = append(extenders, extender)
} else {
ignorableExtenders = append(ignorableExtenders, extender)
}
for _, r := range c.extenders[ii].ManagedResources {
if r.IgnoredByScheduler {
ignoredExtendedResources = append(ignoredExtendedResources, r.Name)
}
}
}
// place ignorable extenders to the tail of extenders
extenders = append(extenders, ignorableExtenders...)
}
// If there are any extended resources found from the Extenders, append them to the pluginConfig for each profile.
// This should only have an effect on ComponentConfig v1alpha2, where it is possible to configure Extenders and
// plugin args (and in which case the extender ignored resources take precedence).
// For earlier versions, using both policy and custom plugin config is disallowed, so this should be the only
// plugin config for this plugin.
if len(ignoredExtendedResources) > 0 {
for i := range c.profiles {
prof := &c.profiles[i]
prof.PluginConfig = append(prof.PluginConfig,
frameworkplugins.NewPluginConfig(
noderesources.FitName,
noderesources.FitArgs{IgnoredResources: ignoredExtendedResources},
),
)
}
}
profiles, err := profile.NewMap(c.profiles, c.buildFramework, c.recorderFactory)
if err != nil {
return nil, fmt.Errorf("initializing profiles: %v", err)
@ -186,11 +229,11 @@ func (c *Configurator) createFromProvider(providerName string) (*Scheduler, erro
plugins.Apply(prof.Plugins)
prof.Plugins = plugins
}
return c.create([]core.SchedulerExtender{})
return c.create()
}
// createFromConfig creates a scheduler from the configuration file
// Only reachable when using v1alpha1 component config
func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler, error) {
lr := frameworkplugins.NewLegacyRegistry()
args := &frameworkplugins.ConfigProducerArgs{}
@ -228,33 +271,6 @@ func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler,
}
}
var extenders []core.SchedulerExtender
if len(policy.Extenders) != 0 {
var ignorableExtenders []core.SchedulerExtender
var ignoredExtendedResources []string
for ii := range policy.Extenders {
klog.V(2).Infof("Creating extender with config %+v", policy.Extenders[ii])
extender, err := core.NewHTTPExtender(&policy.Extenders[ii])
if err != nil {
return nil, err
}
if !extender.IsIgnorable() {
extenders = append(extenders, extender)
} else {
ignorableExtenders = append(ignorableExtenders, extender)
}
for _, r := range policy.Extenders[ii].ManagedResources {
if r.IgnoredByScheduler {
ignoredExtendedResources = append(ignoredExtendedResources, r.Name)
}
}
}
args.NodeResourcesFitArgs = &noderesources.FitArgs{
IgnoredResources: ignoredExtendedResources,
}
// place ignorable extenders to the tail of extenders
extenders = append(extenders, ignorableExtenders...)
}
// HardPodAffinitySymmetricWeight in the policy config takes precedence over
// CLI configuration.
if policy.HardPodAffinitySymmetricWeight != 0 {
@ -312,7 +328,7 @@ func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler,
prof.PluginConfig = pluginConfig
}
return c.create(extenders)
return c.create()
}
// getPriorityConfigs returns priorities configuration: ones that will run as priorities and ones that will run

View File

@ -133,6 +133,7 @@ type schedulerOptions struct {
// Contains out-of-tree plugins to be merged with the in-tree registry.
frameworkOutOfTreeRegistry framework.Registry
profiles []schedulerapi.KubeSchedulerProfile
extenders []schedulerapi.Extender
}
// Option configures a Scheduler
@ -196,6 +197,13 @@ func WithPodMaxBackoffSeconds(podMaxBackoffSeconds int64) Option {
}
}
// WithExtenders sets extenders for the Scheduler
func WithExtenders(e ...schedulerapi.Extender) Option {
return func(o *schedulerOptions) {
o.extenders = e
}
}
var defaultSchedulerOptions = schedulerOptions{
profiles: []schedulerapi.KubeSchedulerProfile{
// Profiles' default plugins are set from the algorithm provider.
@ -264,6 +272,7 @@ func New(client clientset.Interface,
profiles: append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...),
registry: registry,
nodeInfoSnapshot: snapshot,
extenders: options.extenders,
}
metrics.Register()
@ -291,6 +300,10 @@ func New(client clientset.Interface,
return nil, err
}
}
// Set extenders on the configurator now that we've decoded the policy
// In this case, c.extenders should be nil since we're using a policy (and therefore not componentconfig,
// which would have set extenders in the above instantiation of Configurator from CC options)
configurator.extenders = policy.Extenders
sc, err := configurator.createFromConfig(*policy)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)

View File

@ -198,6 +198,7 @@
- k8s.io/apimachinery
- k8s.io/component-base
- k8s.io/klog
- k8s.io/kube-scheduler
- k8s.io/utils
- baseImportPath: "./vendor/k8s.io/kubelet/"

View File

@ -16,6 +16,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/component-base/config/v1alpha1:go_default_library",
"//staging/src/k8s.io/kube-scheduler/config/v1:go_default_library",
],
)

View File

@ -20,6 +20,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
componentbaseconfigv1alpha1 "k8s.io/component-base/config/v1alpha1"
v1 "k8s.io/kube-scheduler/config/v1"
)
const (
@ -91,6 +92,11 @@ type KubeSchedulerConfiguration struct {
// +listType=map
// +listMapKey=schedulerName
Profiles []KubeSchedulerProfile `json:"profiles"`
// Extenders are the list of scheduler extenders, each holding the values of how to communicate
// with the extender. These extenders are shared by all scheduler profiles.
// +listType=set
Extenders []v1.Extender `json:"extenders"`
}
// KubeSchedulerProfile is a scheduling profile.

View File

@ -22,6 +22,7 @@ package v1alpha2
import (
runtime "k8s.io/apimachinery/pkg/runtime"
v1 "k8s.io/kube-scheduler/config/v1"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
@ -73,6 +74,13 @@ func (in *KubeSchedulerConfiguration) DeepCopyInto(out *KubeSchedulerConfigurati
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.Extenders != nil {
in, out := &in.Extenders, &out.Extenders
*out = make([]v1.Extender, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}