From 571cea0519eef38d77524f4074b3bd31bb7f5091 Mon Sep 17 00:00:00 2001 From: Cong Liu Date: Thu, 24 Oct 2019 10:12:00 -0400 Subject: [PATCH] Convert NodeLabelPresence custom predicate to filter plugin. --- cmd/kube-scheduler/app/server.go | 4 +- pkg/scheduler/BUILD | 2 + pkg/scheduler/algorithm_factory.go | 16 +++- .../api/compatibility/compatibility_test.go | 26 +++--- pkg/scheduler/factory.go | 62 ++++++------- pkg/scheduler/factory_test.go | 40 ++++++++- pkg/scheduler/framework/plugins/BUILD | 3 + .../framework/plugins/default_registry.go | 28 +++++- .../framework/plugins/nodelabel/BUILD | 43 +++++++++ .../framework/plugins/nodelabel/node_label.go | 70 +++++++++++++++ .../plugins/nodelabel/node_label_test.go | 88 +++++++++++++++++++ pkg/scheduler/scheduler.go | 20 +++-- 12 files changed, 336 insertions(+), 66 deletions(-) create mode 100644 pkg/scheduler/framework/plugins/nodelabel/BUILD create mode 100644 pkg/scheduler/framework/plugins/nodelabel/node_label.go create mode 100644 pkg/scheduler/framework/plugins/nodelabel/node_label_test.go diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 90d067dd0b1..1e9ca160225 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -157,12 +157,12 @@ func runCommand(cmd *cobra.Command, args []string, opts *options.Options, regist } // Run executes the scheduler based on the given configuration. It only returns on error or when context is done. -func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, registryOptions ...Option) error { +func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTreeRegistryOptions ...Option) error { // To help debugging, immediately log version klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get()) outOfTreeRegistry := make(framework.Registry) - for _, option := range registryOptions { + for _, option := range outOfTreeRegistryOptions { if err := option(outOfTreeRegistry); err != nil { return err } diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index 86f1857b325..e0a45e9ce16 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -22,6 +22,7 @@ go_library( "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/core:go_default_library", "//pkg/scheduler/framework/plugins:go_default_library", + "//pkg/scheduler/framework/plugins/nodelabel:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache/debugger:go_default_library", @@ -80,6 +81,7 @@ go_test( "//pkg/scheduler/apis/extender/v1:go_default_library", "//pkg/scheduler/core:go_default_library", "//pkg/scheduler/framework/plugins:go_default_library", + "//pkg/scheduler/framework/plugins/nodelabel:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache/fake:go_default_library", diff --git a/pkg/scheduler/algorithm_factory.go b/pkg/scheduler/algorithm_factory.go index 532eacca807..8666a631d24 100644 --- a/pkg/scheduler/algorithm_factory.go +++ b/pkg/scheduler/algorithm_factory.go @@ -32,6 +32,8 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" "k8s.io/kubernetes/pkg/scheduler/volumebinder" @@ -259,9 +261,10 @@ func RegisterFitPredicateFactory(name string, predicateFactory FitPredicateFacto // RegisterCustomFitPredicate registers a custom fit predicate with the algorithm registry. // Returns the name, with which the predicate was registered. -func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy) string { +func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy, args *plugins.ConfigProducerArgs) string { var predicateFactory FitPredicateFactory var ok bool + name := policy.Name validatePredicateOrDie(policy) @@ -281,24 +284,31 @@ func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy) string { return predicate } } else if policy.Argument.LabelsPresence != nil { + // map LabelPresence policy to ConfigProducerArgs that's used to configure the NodeLabel plugin. + args.NodeLabelArgs = &nodelabel.Args{ + Labels: policy.Argument.LabelsPresence.Labels, + Presence: policy.Argument.LabelsPresence.Presence, + } predicateFactory = func(args PluginFactoryArgs) predicates.FitPredicate { return predicates.NewNodeLabelPredicate( policy.Argument.LabelsPresence.Labels, policy.Argument.LabelsPresence.Presence, ) } + // We do not allow specifying the name for custom plugins, see #83472 + name = nodelabel.Name } } else if predicateFactory, ok = fitPredicateMap[policy.Name]; ok { // checking to see if a pre-defined predicate is requested klog.V(2).Infof("Predicate type %s already registered, reusing.", policy.Name) - return policy.Name + return name } if predicateFactory == nil { klog.Fatalf("Invalid configuration: Predicate type not found for %s", policy.Name) } - return RegisterFitPredicateFactory(policy.Name, predicateFactory) + return RegisterFitPredicateFactory(name, predicateFactory) } // IsFitPredicateRegistered is useful for testing providers. diff --git a/pkg/scheduler/api/compatibility/compatibility_test.go b/pkg/scheduler/api/compatibility/compatibility_test.go index 0803980a381..09614a8170f 100644 --- a/pkg/scheduler/api/compatibility/compatibility_test.go +++ b/pkg/scheduler/api/compatibility/compatibility_test.go @@ -93,7 +93,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { wantPredicates: sets.NewString( "PodFitsPorts", "TestServiceAffinity", - "TestLabelsPresence", ), wantPrioritizers: sets.NewString( "ServiceSpreadingPriority", @@ -107,6 +106,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodeResourcesFit"}, {Name: "VolumeRestrictions"}, {Name: "TaintToleration"}, + {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesLeastAllocated", Weight: 1}, @@ -139,7 +139,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }`, wantPredicates: sets.NewString( "TestServiceAffinity", - "TestLabelsPresence", ), wantPrioritizers: sets.NewString( "EqualPriority", @@ -156,6 +155,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodeResourcesFit"}, {Name: "VolumeRestrictions"}, {Name: "TaintToleration"}, + {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -195,7 +195,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }`, wantPredicates: sets.NewString( "TestServiceAffinity", - "TestLabelsPresence", ), wantPrioritizers: sets.NewString( "EqualPriority", @@ -216,6 +215,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "GCEPDLimits"}, {Name: "AzureDiskLimits"}, {Name: "VolumeZone"}, + {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -259,7 +259,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }`, wantPredicates: sets.NewString( "TestServiceAffinity", - "TestLabelsPresence", ), wantPrioritizers: sets.NewString( "EqualPriority", @@ -280,6 +279,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "AzureDiskLimits"}, {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, + {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -326,7 +326,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }`, wantPredicates: sets.NewString( "TestServiceAffinity", - "TestLabelsPresence", ), wantPrioritizers: sets.NewString( "EqualPriority", @@ -347,6 +346,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "AzureDiskLimits"}, {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, + {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -404,7 +404,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }`, wantPredicates: sets.NewString( "TestServiceAffinity", - "TestLabelsPresence", ), wantPrioritizers: sets.NewString( "EqualPriority", @@ -425,6 +424,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "AzureDiskLimits"}, {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, + {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -493,7 +493,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }`, wantPredicates: sets.NewString( "TestServiceAffinity", - "TestLabelsPresence", ), wantPrioritizers: sets.NewString( "EqualPriority", @@ -514,6 +513,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "AzureDiskLimits"}, {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, + {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -583,7 +583,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }`, wantPredicates: sets.NewString( "TestServiceAffinity", - "TestLabelsPresence", ), wantPrioritizers: sets.NewString( "EqualPriority", @@ -605,6 +604,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "VolumeBinding"}, {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, + {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -677,7 +677,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }`, wantPredicates: sets.NewString( "TestServiceAffinity", - "TestLabelsPresence", ), wantPrioritizers: sets.NewString( "EqualPriority", @@ -699,6 +698,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "VolumeBinding"}, {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, + {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -783,7 +783,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }`, wantPredicates: sets.NewString( "TestServiceAffinity", - "TestLabelsPresence", ), wantPrioritizers: sets.NewString( "EqualPriority", @@ -806,6 +805,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "VolumeBinding"}, {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, + {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -891,7 +891,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }`, wantPredicates: sets.NewString( "TestServiceAffinity", - "TestLabelsPresence", ), wantPrioritizers: sets.NewString( "EqualPriority", @@ -915,6 +914,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "VolumeBinding"}, {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, + {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -999,7 +999,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }`, wantPredicates: sets.NewString( "TestServiceAffinity", - "TestLabelsPresence", ), wantPrioritizers: sets.NewString( "EqualPriority", @@ -1024,6 +1023,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "VolumeBinding"}, {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, + {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -1112,7 +1112,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }`, wantPredicates: sets.NewString( "TestServiceAffinity", - "TestLabelsPresence", ), wantPrioritizers: sets.NewString( "EqualPriority", @@ -1137,6 +1136,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "VolumeBinding"}, {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, + {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index b9fe4056f5d..c28f4abb3bf 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -177,6 +177,9 @@ type Configurator struct { pluginConfig []config.PluginConfig pluginConfigProducerRegistry *plugins.ConfigProducerRegistry nodeInfoSnapshot *nodeinfosnapshot.Snapshot + + factoryArgs *PluginFactoryArgs + configProducerArgs *plugins.ConfigProducerArgs } // ConfigFactoryArgs is a set arguments passed to NewConfigFactory. @@ -263,6 +266,22 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator { pluginConfigProducerRegistry: args.PluginConfigProducerRegistry, nodeInfoSnapshot: nodeinfosnapshot.NewEmptySnapshot(), } + c.factoryArgs = &PluginFactoryArgs{ + NodeInfoLister: c.nodeInfoSnapshot.NodeInfos(), + PodLister: c.nodeInfoSnapshot.Pods(), + ServiceLister: c.serviceLister, + ControllerLister: c.controllerLister, + ReplicaSetLister: c.replicaSetLister, + StatefulSetLister: c.statefulSetLister, + PDBLister: c.pdbLister, + CSINodeLister: c.csiNodeLister, + PVLister: c.pVLister, + PVCLister: c.pVCLister, + StorageClassLister: c.storageClassLister, + VolumeBinder: c.volumeBinder, + HardPodAffinitySymmetricWeight: c.hardPodAffinitySymmetricWeight, + } + c.configProducerArgs = &plugins.ConfigProducerArgs{} return c } @@ -307,7 +326,7 @@ func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Config, er } else { for _, predicate := range policy.Predicates { klog.V(2).Infof("Registering predicate: %s", predicate.Name) - predicateKeys.Insert(RegisterCustomFitPredicate(predicate)) + predicateKeys.Insert(RegisterCustomFitPredicate(predicate, c.configProducerArgs)) } } @@ -383,7 +402,7 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e return nil, err } - priorityMetaProducer, err := c.getPriorityMetadataProducer() + priorityMetaProducer, err := getPriorityMetadataProducer(*c.factoryArgs) if err != nil { return nil, err } @@ -487,9 +506,7 @@ func getBinderFunc(client clientset.Interface, extenders []algorithm.SchedulerEx // as framework plugins. Specifically, a priority will run as a framework plugin if a plugin config producer was // registered for that priority. func (c *Configurator) getPriorityConfigs(priorityKeys sets.String) ([]priorities.PriorityConfig, *config.Plugins, []config.PluginConfig, error) { - algorithmArgs, configProducerArgs := c.getAlgorithmArgs() - - allPriorityConfigs, err := getPriorityFunctionConfigs(priorityKeys, *algorithmArgs) + allPriorityConfigs, err := getPriorityFunctionConfigs(priorityKeys, *c.factoryArgs) if err != nil { return nil, nil, nil, err } @@ -504,7 +521,7 @@ func (c *Configurator) getPriorityConfigs(priorityKeys sets.String) ([]prioritie frameworkConfigProducers := c.pluginConfigProducerRegistry.PriorityToConfigProducer for _, p := range allPriorityConfigs { if producer, exist := frameworkConfigProducers[p.Name]; exist { - args := *configProducerArgs + args := *c.configProducerArgs args.Weight = int32(p.Weight) pl, pc := producer(args) plugins.Append(&pl) @@ -516,12 +533,6 @@ func (c *Configurator) getPriorityConfigs(priorityKeys sets.String) ([]prioritie return priorityConfigs, &plugins, pluginConfig, nil } -func (c *Configurator) getPriorityMetadataProducer() (priorities.PriorityMetadataProducer, error) { - algorithmArgs, _ := c.getAlgorithmArgs() - - return getPriorityMetadataProducer(*algorithmArgs) -} - // GetPredicateMetadataProducer returns a function to build Predicate Metadata. // It is used by the scheduler and other components, such as k8s.io/autoscaler/cluster-autoscaler. func (c *Configurator) GetPredicateMetadataProducer() (predicates.PredicateMetadataProducer, error) { @@ -534,9 +545,7 @@ func (c *Configurator) GetPredicateMetadataProducer() (predicates.PredicateMetad // Note that the framework executes plugins according to their order in the Plugins list, and so predicates run as plugins // are added to the Plugins list according to the order specified in predicates.Ordering(). func (c *Configurator) getPredicateConfigs(predicateKeys sets.String) (map[string]predicates.FitPredicate, *config.Plugins, []config.PluginConfig, error) { - algorithmArgs, configProducerArgs := c.getAlgorithmArgs() - - allFitPredicates, err := getFitPredicateFunctions(predicateKeys, *algorithmArgs) + allFitPredicates, err := getFitPredicateFunctions(predicateKeys, *c.factoryArgs) if err != nil { return nil, nil, nil, err } @@ -563,10 +572,11 @@ func (c *Configurator) getPredicateConfigs(predicateKeys sets.String) (map[strin // that the corresponding predicates were supposed to run. var plugins config.Plugins var pluginConfig []config.PluginConfig + for _, predicateKey := range predicates.Ordering() { if asPlugins.Has(predicateKey) { producer := frameworkConfigProducers[predicateKey] - p, pc := producer(*configProducerArgs) + p, pc := producer(*c.configProducerArgs) plugins.Append(&p) pluginConfig = append(pluginConfig, pc...) asPlugins.Delete(predicateKey) @@ -576,7 +586,7 @@ func (c *Configurator) getPredicateConfigs(predicateKeys sets.String) (map[strin // Third, add the rest in no specific order. for predicateKey := range asPlugins { producer := frameworkConfigProducers[predicateKey] - p, pc := producer(*configProducerArgs) + p, pc := producer(*c.configProducerArgs) plugins.Append(&p) pluginConfig = append(pluginConfig, pc...) } @@ -584,24 +594,6 @@ func (c *Configurator) getPredicateConfigs(predicateKeys sets.String) (map[strin return asFitPredicates, &plugins, pluginConfig, nil } -func (c *Configurator) getAlgorithmArgs() (*PluginFactoryArgs, *plugins.ConfigProducerArgs) { - return &PluginFactoryArgs{ - NodeInfoLister: c.nodeInfoSnapshot.NodeInfos(), - PodLister: c.nodeInfoSnapshot.Pods(), - ServiceLister: c.serviceLister, - ControllerLister: c.controllerLister, - ReplicaSetLister: c.replicaSetLister, - StatefulSetLister: c.statefulSetLister, - PDBLister: c.pdbLister, - CSINodeLister: c.csiNodeLister, - PVLister: c.pVLister, - PVCLister: c.pVCLister, - StorageClassLister: c.storageClassLister, - VolumeBinder: c.volumeBinder, - HardPodAffinitySymmetricWeight: c.hardPodAffinitySymmetricWeight, - }, &plugins.ConfigProducerArgs{} -} - type podInformer struct { informer cache.SharedIndexInformer } diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index 2aa9433f818..7f9a5371dd1 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -18,6 +18,7 @@ package scheduler import ( "context" + "encoding/json" "errors" "fmt" "reflect" @@ -46,6 +47,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config" extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1" frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" @@ -102,11 +104,47 @@ func TestCreateFromConfig(t *testing.T) { t.Errorf("Invalid configuration: %v", err) } - factory.CreateFromConfig(policy) + conf, err := factory.CreateFromConfig(policy) + if err != nil { + t.Fatalf("CreateFromConfig failed: %v", err) + } hpa := factory.GetHardPodAffinitySymmetricWeight() if hpa != v1.DefaultHardPodAffinitySymmetricWeight { t.Errorf("Wrong hardPodAffinitySymmetricWeight, ecpected: %d, got: %d", v1.DefaultHardPodAffinitySymmetricWeight, hpa) } + + // Verify that custom predicates are converted to framework plugins. + if !pluginExists(nodelabel.Name, "FilterPlugin", conf) { + t.Error("NodeLabel plugin not exist in framework.") + } + // Verify that the policy config is converted to plugin config for custom predicates. + nodeLabelConfig := findPluginConfig(nodelabel.Name, conf) + encoding, err := json.Marshal(nodeLabelConfig) + if err != nil { + t.Errorf("Failed to marshal %+v: %v", nodeLabelConfig, err) + } + want := `{"Name":"NodeLabel","Args":{"labels":["zone"],"presence":true}}` + if string(encoding) != want { + t.Errorf("Config for NodeLabel plugin mismatch. got: %v, want: %v", string(encoding), want) + } +} + +func pluginExists(name, extensionPoint string, schedConf *Config) bool { + for _, pl := range schedConf.Framework.ListPlugins()[extensionPoint] { + if pl.Name == name { + return true + } + } + return false +} + +func findPluginConfig(name string, schedConf *Config) config.PluginConfig { + for _, c := range schedConf.PluginConfig { + if c.Name == name { + return c + } + } + return config.PluginConfig{} } func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) { diff --git a/pkg/scheduler/framework/plugins/BUILD b/pkg/scheduler/framework/plugins/BUILD index 644ac79c3f3..9a30d1535e1 100644 --- a/pkg/scheduler/framework/plugins/BUILD +++ b/pkg/scheduler/framework/plugins/BUILD @@ -12,6 +12,7 @@ go_library( "//pkg/scheduler/framework/plugins/imagelocality:go_default_library", "//pkg/scheduler/framework/plugins/interpodaffinity:go_default_library", "//pkg/scheduler/framework/plugins/nodeaffinity:go_default_library", + "//pkg/scheduler/framework/plugins/nodelabel:go_default_library", "//pkg/scheduler/framework/plugins/nodename:go_default_library", "//pkg/scheduler/framework/plugins/nodeports:go_default_library", "//pkg/scheduler/framework/plugins/nodepreferavoidpods:go_default_library", @@ -26,6 +27,7 @@ go_library( "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/volumebinder:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/klog:go_default_library", ], ) @@ -45,6 +47,7 @@ filegroup( "//pkg/scheduler/framework/plugins/interpodaffinity:all-srcs", "//pkg/scheduler/framework/plugins/migration:all-srcs", "//pkg/scheduler/framework/plugins/nodeaffinity:all-srcs", + "//pkg/scheduler/framework/plugins/nodelabel:all-srcs", "//pkg/scheduler/framework/plugins/nodename:all-srcs", "//pkg/scheduler/framework/plugins/nodeports:all-srcs", "//pkg/scheduler/framework/plugins/nodepreferavoidpods:all-srcs", diff --git a/pkg/scheduler/framework/plugins/default_registry.go b/pkg/scheduler/framework/plugins/default_registry.go index cc51737b1bd..8066afb4e2e 100644 --- a/pkg/scheduler/framework/plugins/default_registry.go +++ b/pkg/scheduler/framework/plugins/default_registry.go @@ -17,15 +17,18 @@ limitations under the License. package plugins import ( + "encoding/json" "fmt" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/imagelocality" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodepreferavoidpods" @@ -46,9 +49,9 @@ type RegistryArgs struct { VolumeBinder *volumebinder.VolumeBinder } -// NewDefaultRegistry builds a default registry with all the default plugins. -// This is the registry that Kubernetes default scheduler uses. A scheduler that -// runs custom plugins, can pass a different Registry when initializing the scheduler. +// NewDefaultRegistry builds the default registry with all the in-tree plugins. +// This is the registry that Kubernetes default scheduler uses. A scheduler that runs out of tree +// plugins can register additional plugins through the WithFrameworkOutOfTreeRegistry option. func NewDefaultRegistry(args *RegistryArgs) framework.Registry { return framework.Registry{ imagelocality.Name: imagelocality.New, @@ -74,6 +77,7 @@ func NewDefaultRegistry(args *RegistryArgs) framework.Registry { nodevolumelimits.AzureDiskName: nodevolumelimits.NewAzureDisk, nodevolumelimits.CinderName: nodevolumelimits.NewCinder, interpodaffinity.Name: interpodaffinity.New, + nodelabel.Name: nodelabel.New, } } @@ -83,6 +87,8 @@ func NewDefaultRegistry(args *RegistryArgs) framework.Registry { type ConfigProducerArgs struct { // Weight used for priority functions. Weight int32 + // NodeLabelArgs is the args for the NodeLabel plugin. + NodeLabelArgs *nodelabel.Args } // ConfigProducer produces a framework's configuration. @@ -191,6 +197,22 @@ func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry { plugins.Filter = appendToPluginSet(plugins.Filter, podtopologyspread.Name, nil) return }) + registry.RegisterPredicate(nodelabel.Name, + func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { + plugins.Filter = appendToPluginSet(plugins.Filter, nodelabel.Name, nil) + encoding, err := json.Marshal(args.NodeLabelArgs) + if err != nil { + klog.Fatalf("Failed to marshal %+v", args.NodeLabelArgs) + return + } + config := config.PluginConfig{ + Name: nodelabel.Name, + Args: runtime.Unknown{Raw: encoding}, + } + pluginConfig = append(pluginConfig, config) + return + }) + // Register Priorities. registry.RegisterPriority(priorities.TaintTolerationPriority, func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { diff --git a/pkg/scheduler/framework/plugins/nodelabel/BUILD b/pkg/scheduler/framework/plugins/nodelabel/BUILD new file mode 100644 index 00000000000..7f68b26586f --- /dev/null +++ b/pkg/scheduler/framework/plugins/nodelabel/BUILD @@ -0,0 +1,43 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["node_label.go"], + importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel", + visibility = ["//visibility:public"], + deps = [ + "//pkg/scheduler/algorithm/predicates:go_default_library", + "//pkg/scheduler/framework/plugins/migration:go_default_library", + "//pkg/scheduler/framework/v1alpha1:go_default_library", + "//pkg/scheduler/nodeinfo:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["node_label_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/scheduler/framework/v1alpha1:go_default_library", + "//pkg/scheduler/nodeinfo:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/scheduler/framework/plugins/nodelabel/node_label.go b/pkg/scheduler/framework/plugins/nodelabel/node_label.go new file mode 100644 index 00000000000..909ffcab393 --- /dev/null +++ b/pkg/scheduler/framework/plugins/nodelabel/node_label.go @@ -0,0 +1,70 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodelabel + +import ( + "context" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" + "k8s.io/kubernetes/pkg/scheduler/nodeinfo" +) + +// Name of this plugin. +const Name = "NodeLabel" + +// Args holds the args that are used to configure the plugin. +type Args struct { + // The list of labels that identify node "groups" + // All of the labels should be either present (or absent) for the node to be considered a fit for hosting the pod + Labels []string `json:"labels,omitempty"` + // The boolean flag that indicates whether the labels should be present or absent from the node + Presence bool `json:"presence,omitempty"` +} + +// New initializes a new plugin and returns it. +func New(plArgs *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { + args := &Args{} + if err := framework.DecodeInto(plArgs, args); err != nil { + return nil, err + } + return &NodeLabel{ + predicate: predicates.NewNodeLabelPredicate(args.Labels, args.Presence), + }, nil +} + +// NodeLabel checks whether a pod can fit based on the node labels which match a filter that it requests. +type NodeLabel struct { + predicate predicates.FitPredicate +} + +var _ framework.FilterPlugin = &NodeLabel{} + +// Name returns name of the plugin. It is used in logs, etc. +func (pl *NodeLabel) Name() string { + return Name +} + +// Filter invoked at the filter extension point. +func (pl *NodeLabel) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { + // Note that NodeLabelPredicate doesn't use predicate metadata, hence passing nil here. + _, reasons, err := pl.predicate(pod, nil, nodeInfo) + return migration.PredicateResultToFrameworkStatus(reasons, err) +} diff --git a/pkg/scheduler/framework/plugins/nodelabel/node_label_test.go b/pkg/scheduler/framework/plugins/nodelabel/node_label_test.go new file mode 100644 index 00000000000..2b057fffeb9 --- /dev/null +++ b/pkg/scheduler/framework/plugins/nodelabel/node_label_test.go @@ -0,0 +1,88 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodelabel + +import ( + "context" + "testing" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" + schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" +) + +func TestNodeLabelPresence(t *testing.T) { + label := map[string]string{"foo": "bar", "bar": "foo"} + tests := []struct { + name string + pod *v1.Pod + rawArgs string + res framework.Code + }{ + { + name: "label does not match, presence true", + rawArgs: `{"labels" : ["baz"], "presence" : true}`, + res: framework.UnschedulableAndUnresolvable, + }, + { + name: "label does not match, presence false", + rawArgs: `{"labels" : ["baz"], "presence" : false}`, + res: framework.Success, + }, + { + name: "one label matches, presence true", + rawArgs: `{"labels" : ["foo", "baz"], "presence" : true}`, + res: framework.UnschedulableAndUnresolvable, + }, + { + name: "one label matches, presence false", + rawArgs: `{"labels" : ["foo", "baz"], "presence" : false}`, + res: framework.UnschedulableAndUnresolvable, + }, + { + name: "all labels match, presence true", + rawArgs: `{"labels" : ["foo", "bar"], "presence" : true}`, + res: framework.Success, + }, + { + name: "all labels match, presence false", + rawArgs: `{"labels" : ["foo", "bar"], "presence" : false}`, + res: framework.UnschedulableAndUnresolvable, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + node := v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: label}} + nodeInfo := schedulernodeinfo.NewNodeInfo() + nodeInfo.SetNode(&node) + + args := &runtime.Unknown{Raw: []byte(test.rawArgs)} + p, err := New(args, nil) + if err != nil { + t.Fatalf("Failed to create plugin: %v", err) + } + + status := p.(framework.FilterPlugin).Filter(context.TODO(), nil, test.pod, nodeInfo) + if status.Code() != test.res { + t.Errorf("Status mismatch. got: %v, want: %v", status.Code(), test.res) + } + }) + } +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 73f02c90318..cf92a9a68a4 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -128,14 +128,16 @@ func (sched *Scheduler) Cache() internalcache.Cache { } type schedulerOptions struct { - schedulerName string - hardPodAffinitySymmetricWeight int32 - disablePreemption bool - percentageOfNodesToScore int32 - bindTimeoutSeconds int64 - podInitialBackoffSeconds int64 - podMaxBackoffSeconds int64 - frameworkDefaultRegistry framework.Registry + schedulerName string + hardPodAffinitySymmetricWeight int32 + disablePreemption bool + percentageOfNodesToScore int32 + bindTimeoutSeconds int64 + podInitialBackoffSeconds int64 + podMaxBackoffSeconds int64 + // Default registry contains all in-tree plugins + frameworkDefaultRegistry framework.Registry + // This registry contains out of tree plugins to be merged with default registry. frameworkOutOfTreeRegistry framework.Registry frameworkConfigProducerRegistry *frameworkplugins.ConfigProducerRegistry frameworkPlugins *kubeschedulerconfig.Plugins @@ -180,7 +182,7 @@ func WithBindTimeoutSeconds(bindTimeoutSeconds int64) Option { } } -// WithFrameworkDefaultRegistry sets the framework's default registry. +// WithFrameworkDefaultRegistry sets the framework's default registry. This is only used in integration tests. func WithFrameworkDefaultRegistry(registry framework.Registry) Option { return func(o *schedulerOptions) { o.frameworkDefaultRegistry = registry