Merge pull request #84297 from liu-cong/custompred

Convert NodeLabelPresence custom predicate to filter plugin.
This commit is contained in:
Kubernetes Prow Robot 2019-10-26 09:57:23 -07:00 committed by GitHub
commit 41730db3f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 336 additions and 66 deletions

View File

@ -157,12 +157,12 @@ func runCommand(cmd *cobra.Command, args []string, opts *options.Options, regist
} }
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done. // Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, registryOptions ...Option) error { func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTreeRegistryOptions ...Option) error {
// To help debugging, immediately log version // To help debugging, immediately log version
klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get()) klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())
outOfTreeRegistry := make(framework.Registry) outOfTreeRegistry := make(framework.Registry)
for _, option := range registryOptions { for _, option := range outOfTreeRegistryOptions {
if err := option(outOfTreeRegistry); err != nil { if err := option(outOfTreeRegistry); err != nil {
return err return err
} }

View File

@ -22,6 +22,7 @@ go_library(
"//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/core:go_default_library", "//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/framework/plugins:go_default_library", "//pkg/scheduler/framework/plugins:go_default_library",
"//pkg/scheduler/framework/plugins/nodelabel:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/debugger:go_default_library", "//pkg/scheduler/internal/cache/debugger:go_default_library",
@ -80,6 +81,7 @@ go_test(
"//pkg/scheduler/apis/extender/v1:go_default_library", "//pkg/scheduler/apis/extender/v1:go_default_library",
"//pkg/scheduler/core:go_default_library", "//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/framework/plugins:go_default_library", "//pkg/scheduler/framework/plugins:go_default_library",
"//pkg/scheduler/framework/plugins/nodelabel:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/fake:go_default_library", "//pkg/scheduler/internal/cache/fake:go_default_library",

View File

@ -32,6 +32,8 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
"k8s.io/kubernetes/pkg/scheduler/volumebinder" "k8s.io/kubernetes/pkg/scheduler/volumebinder"
@ -259,9 +261,10 @@ func RegisterFitPredicateFactory(name string, predicateFactory FitPredicateFacto
// RegisterCustomFitPredicate registers a custom fit predicate with the algorithm registry. // RegisterCustomFitPredicate registers a custom fit predicate with the algorithm registry.
// Returns the name, with which the predicate was registered. // Returns the name, with which the predicate was registered.
func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy) string { func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy, args *plugins.ConfigProducerArgs) string {
var predicateFactory FitPredicateFactory var predicateFactory FitPredicateFactory
var ok bool var ok bool
name := policy.Name
validatePredicateOrDie(policy) validatePredicateOrDie(policy)
@ -281,24 +284,31 @@ func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy) string {
return predicate return predicate
} }
} else if policy.Argument.LabelsPresence != nil { } else if policy.Argument.LabelsPresence != nil {
// map LabelPresence policy to ConfigProducerArgs that's used to configure the NodeLabel plugin.
args.NodeLabelArgs = &nodelabel.Args{
Labels: policy.Argument.LabelsPresence.Labels,
Presence: policy.Argument.LabelsPresence.Presence,
}
predicateFactory = func(args PluginFactoryArgs) predicates.FitPredicate { predicateFactory = func(args PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewNodeLabelPredicate( return predicates.NewNodeLabelPredicate(
policy.Argument.LabelsPresence.Labels, policy.Argument.LabelsPresence.Labels,
policy.Argument.LabelsPresence.Presence, policy.Argument.LabelsPresence.Presence,
) )
} }
// We do not allow specifying the name for custom plugins, see #83472
name = nodelabel.Name
} }
} else if predicateFactory, ok = fitPredicateMap[policy.Name]; ok { } else if predicateFactory, ok = fitPredicateMap[policy.Name]; ok {
// checking to see if a pre-defined predicate is requested // checking to see if a pre-defined predicate is requested
klog.V(2).Infof("Predicate type %s already registered, reusing.", policy.Name) klog.V(2).Infof("Predicate type %s already registered, reusing.", policy.Name)
return policy.Name return name
} }
if predicateFactory == nil { if predicateFactory == nil {
klog.Fatalf("Invalid configuration: Predicate type not found for %s", policy.Name) klog.Fatalf("Invalid configuration: Predicate type not found for %s", policy.Name)
} }
return RegisterFitPredicateFactory(policy.Name, predicateFactory) return RegisterFitPredicateFactory(name, predicateFactory)
} }
// IsFitPredicateRegistered is useful for testing providers. // IsFitPredicateRegistered is useful for testing providers.

View File

@ -93,7 +93,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
wantPredicates: sets.NewString( wantPredicates: sets.NewString(
"PodFitsPorts", "PodFitsPorts",
"TestServiceAffinity", "TestServiceAffinity",
"TestLabelsPresence",
), ),
wantPrioritizers: sets.NewString( wantPrioritizers: sets.NewString(
"ServiceSpreadingPriority", "ServiceSpreadingPriority",
@ -107,6 +106,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeResourcesFit"}, {Name: "NodeResourcesFit"},
{Name: "VolumeRestrictions"}, {Name: "VolumeRestrictions"},
{Name: "TaintToleration"}, {Name: "TaintToleration"},
{Name: "NodeLabel"},
}, },
"ScorePlugin": { "ScorePlugin": {
{Name: "NodeResourcesLeastAllocated", Weight: 1}, {Name: "NodeResourcesLeastAllocated", Weight: 1},
@ -139,7 +139,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}`, }`,
wantPredicates: sets.NewString( wantPredicates: sets.NewString(
"TestServiceAffinity", "TestServiceAffinity",
"TestLabelsPresence",
), ),
wantPrioritizers: sets.NewString( wantPrioritizers: sets.NewString(
"EqualPriority", "EqualPriority",
@ -156,6 +155,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeResourcesFit"}, {Name: "NodeResourcesFit"},
{Name: "VolumeRestrictions"}, {Name: "VolumeRestrictions"},
{Name: "TaintToleration"}, {Name: "TaintToleration"},
{Name: "NodeLabel"},
}, },
"ScorePlugin": { "ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -195,7 +195,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}`, }`,
wantPredicates: sets.NewString( wantPredicates: sets.NewString(
"TestServiceAffinity", "TestServiceAffinity",
"TestLabelsPresence",
), ),
wantPrioritizers: sets.NewString( wantPrioritizers: sets.NewString(
"EqualPriority", "EqualPriority",
@ -216,6 +215,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "GCEPDLimits"}, {Name: "GCEPDLimits"},
{Name: "AzureDiskLimits"}, {Name: "AzureDiskLimits"},
{Name: "VolumeZone"}, {Name: "VolumeZone"},
{Name: "NodeLabel"},
}, },
"ScorePlugin": { "ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -259,7 +259,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}`, }`,
wantPredicates: sets.NewString( wantPredicates: sets.NewString(
"TestServiceAffinity", "TestServiceAffinity",
"TestLabelsPresence",
), ),
wantPrioritizers: sets.NewString( wantPrioritizers: sets.NewString(
"EqualPriority", "EqualPriority",
@ -280,6 +279,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "AzureDiskLimits"}, {Name: "AzureDiskLimits"},
{Name: "VolumeZone"}, {Name: "VolumeZone"},
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
{Name: "NodeLabel"},
}, },
"ScorePlugin": { "ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -326,7 +326,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}`, }`,
wantPredicates: sets.NewString( wantPredicates: sets.NewString(
"TestServiceAffinity", "TestServiceAffinity",
"TestLabelsPresence",
), ),
wantPrioritizers: sets.NewString( wantPrioritizers: sets.NewString(
"EqualPriority", "EqualPriority",
@ -347,6 +346,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "AzureDiskLimits"}, {Name: "AzureDiskLimits"},
{Name: "VolumeZone"}, {Name: "VolumeZone"},
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
{Name: "NodeLabel"},
}, },
"ScorePlugin": { "ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -404,7 +404,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}`, }`,
wantPredicates: sets.NewString( wantPredicates: sets.NewString(
"TestServiceAffinity", "TestServiceAffinity",
"TestLabelsPresence",
), ),
wantPrioritizers: sets.NewString( wantPrioritizers: sets.NewString(
"EqualPriority", "EqualPriority",
@ -425,6 +424,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "AzureDiskLimits"}, {Name: "AzureDiskLimits"},
{Name: "VolumeZone"}, {Name: "VolumeZone"},
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
{Name: "NodeLabel"},
}, },
"ScorePlugin": { "ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -493,7 +493,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}`, }`,
wantPredicates: sets.NewString( wantPredicates: sets.NewString(
"TestServiceAffinity", "TestServiceAffinity",
"TestLabelsPresence",
), ),
wantPrioritizers: sets.NewString( wantPrioritizers: sets.NewString(
"EqualPriority", "EqualPriority",
@ -514,6 +513,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "AzureDiskLimits"}, {Name: "AzureDiskLimits"},
{Name: "VolumeZone"}, {Name: "VolumeZone"},
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
{Name: "NodeLabel"},
}, },
"ScorePlugin": { "ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -583,7 +583,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}`, }`,
wantPredicates: sets.NewString( wantPredicates: sets.NewString(
"TestServiceAffinity", "TestServiceAffinity",
"TestLabelsPresence",
), ),
wantPrioritizers: sets.NewString( wantPrioritizers: sets.NewString(
"EqualPriority", "EqualPriority",
@ -605,6 +604,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "VolumeBinding"}, {Name: "VolumeBinding"},
{Name: "VolumeZone"}, {Name: "VolumeZone"},
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
{Name: "NodeLabel"},
}, },
"ScorePlugin": { "ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -677,7 +677,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}`, }`,
wantPredicates: sets.NewString( wantPredicates: sets.NewString(
"TestServiceAffinity", "TestServiceAffinity",
"TestLabelsPresence",
), ),
wantPrioritizers: sets.NewString( wantPrioritizers: sets.NewString(
"EqualPriority", "EqualPriority",
@ -699,6 +698,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "VolumeBinding"}, {Name: "VolumeBinding"},
{Name: "VolumeZone"}, {Name: "VolumeZone"},
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
{Name: "NodeLabel"},
}, },
"ScorePlugin": { "ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -783,7 +783,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}`, }`,
wantPredicates: sets.NewString( wantPredicates: sets.NewString(
"TestServiceAffinity", "TestServiceAffinity",
"TestLabelsPresence",
), ),
wantPrioritizers: sets.NewString( wantPrioritizers: sets.NewString(
"EqualPriority", "EqualPriority",
@ -806,6 +805,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "VolumeBinding"}, {Name: "VolumeBinding"},
{Name: "VolumeZone"}, {Name: "VolumeZone"},
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
{Name: "NodeLabel"},
}, },
"ScorePlugin": { "ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -891,7 +891,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}`, }`,
wantPredicates: sets.NewString( wantPredicates: sets.NewString(
"TestServiceAffinity", "TestServiceAffinity",
"TestLabelsPresence",
), ),
wantPrioritizers: sets.NewString( wantPrioritizers: sets.NewString(
"EqualPriority", "EqualPriority",
@ -915,6 +914,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "VolumeBinding"}, {Name: "VolumeBinding"},
{Name: "VolumeZone"}, {Name: "VolumeZone"},
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
{Name: "NodeLabel"},
}, },
"ScorePlugin": { "ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -999,7 +999,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}`, }`,
wantPredicates: sets.NewString( wantPredicates: sets.NewString(
"TestServiceAffinity", "TestServiceAffinity",
"TestLabelsPresence",
), ),
wantPrioritizers: sets.NewString( wantPrioritizers: sets.NewString(
"EqualPriority", "EqualPriority",
@ -1024,6 +1023,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "VolumeBinding"}, {Name: "VolumeBinding"},
{Name: "VolumeZone"}, {Name: "VolumeZone"},
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
{Name: "NodeLabel"},
}, },
"ScorePlugin": { "ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -1112,7 +1112,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}`, }`,
wantPredicates: sets.NewString( wantPredicates: sets.NewString(
"TestServiceAffinity", "TestServiceAffinity",
"TestLabelsPresence",
), ),
wantPrioritizers: sets.NewString( wantPrioritizers: sets.NewString(
"EqualPriority", "EqualPriority",
@ -1137,6 +1136,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "VolumeBinding"}, {Name: "VolumeBinding"},
{Name: "VolumeZone"}, {Name: "VolumeZone"},
{Name: "InterPodAffinity"}, {Name: "InterPodAffinity"},
{Name: "NodeLabel"},
}, },
"ScorePlugin": { "ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2}, {Name: "NodeResourcesBalancedAllocation", Weight: 2},

View File

@ -177,6 +177,9 @@ type Configurator struct {
pluginConfig []config.PluginConfig pluginConfig []config.PluginConfig
pluginConfigProducerRegistry *plugins.ConfigProducerRegistry pluginConfigProducerRegistry *plugins.ConfigProducerRegistry
nodeInfoSnapshot *nodeinfosnapshot.Snapshot nodeInfoSnapshot *nodeinfosnapshot.Snapshot
factoryArgs *PluginFactoryArgs
configProducerArgs *plugins.ConfigProducerArgs
} }
// ConfigFactoryArgs is a set arguments passed to NewConfigFactory. // ConfigFactoryArgs is a set arguments passed to NewConfigFactory.
@ -263,6 +266,22 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {
pluginConfigProducerRegistry: args.PluginConfigProducerRegistry, pluginConfigProducerRegistry: args.PluginConfigProducerRegistry,
nodeInfoSnapshot: nodeinfosnapshot.NewEmptySnapshot(), nodeInfoSnapshot: nodeinfosnapshot.NewEmptySnapshot(),
} }
c.factoryArgs = &PluginFactoryArgs{
NodeInfoLister: c.nodeInfoSnapshot.NodeInfos(),
PodLister: c.nodeInfoSnapshot.Pods(),
ServiceLister: c.serviceLister,
ControllerLister: c.controllerLister,
ReplicaSetLister: c.replicaSetLister,
StatefulSetLister: c.statefulSetLister,
PDBLister: c.pdbLister,
CSINodeLister: c.csiNodeLister,
PVLister: c.pVLister,
PVCLister: c.pVCLister,
StorageClassLister: c.storageClassLister,
VolumeBinder: c.volumeBinder,
HardPodAffinitySymmetricWeight: c.hardPodAffinitySymmetricWeight,
}
c.configProducerArgs = &plugins.ConfigProducerArgs{}
return c return c
} }
@ -307,7 +326,7 @@ func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Config, er
} else { } else {
for _, predicate := range policy.Predicates { for _, predicate := range policy.Predicates {
klog.V(2).Infof("Registering predicate: %s", predicate.Name) klog.V(2).Infof("Registering predicate: %s", predicate.Name)
predicateKeys.Insert(RegisterCustomFitPredicate(predicate)) predicateKeys.Insert(RegisterCustomFitPredicate(predicate, c.configProducerArgs))
} }
} }
@ -383,7 +402,7 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
return nil, err return nil, err
} }
priorityMetaProducer, err := c.getPriorityMetadataProducer() priorityMetaProducer, err := getPriorityMetadataProducer(*c.factoryArgs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -487,9 +506,7 @@ func getBinderFunc(client clientset.Interface, extenders []algorithm.SchedulerEx
// as framework plugins. Specifically, a priority will run as a framework plugin if a plugin config producer was // as framework plugins. Specifically, a priority will run as a framework plugin if a plugin config producer was
// registered for that priority. // registered for that priority.
func (c *Configurator) getPriorityConfigs(priorityKeys sets.String) ([]priorities.PriorityConfig, *config.Plugins, []config.PluginConfig, error) { func (c *Configurator) getPriorityConfigs(priorityKeys sets.String) ([]priorities.PriorityConfig, *config.Plugins, []config.PluginConfig, error) {
algorithmArgs, configProducerArgs := c.getAlgorithmArgs() allPriorityConfigs, err := getPriorityFunctionConfigs(priorityKeys, *c.factoryArgs)
allPriorityConfigs, err := getPriorityFunctionConfigs(priorityKeys, *algorithmArgs)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
@ -504,7 +521,7 @@ func (c *Configurator) getPriorityConfigs(priorityKeys sets.String) ([]prioritie
frameworkConfigProducers := c.pluginConfigProducerRegistry.PriorityToConfigProducer frameworkConfigProducers := c.pluginConfigProducerRegistry.PriorityToConfigProducer
for _, p := range allPriorityConfigs { for _, p := range allPriorityConfigs {
if producer, exist := frameworkConfigProducers[p.Name]; exist { if producer, exist := frameworkConfigProducers[p.Name]; exist {
args := *configProducerArgs args := *c.configProducerArgs
args.Weight = int32(p.Weight) args.Weight = int32(p.Weight)
pl, pc := producer(args) pl, pc := producer(args)
plugins.Append(&pl) plugins.Append(&pl)
@ -516,12 +533,6 @@ func (c *Configurator) getPriorityConfigs(priorityKeys sets.String) ([]prioritie
return priorityConfigs, &plugins, pluginConfig, nil return priorityConfigs, &plugins, pluginConfig, nil
} }
func (c *Configurator) getPriorityMetadataProducer() (priorities.PriorityMetadataProducer, error) {
algorithmArgs, _ := c.getAlgorithmArgs()
return getPriorityMetadataProducer(*algorithmArgs)
}
// GetPredicateMetadataProducer returns a function to build Predicate Metadata. // GetPredicateMetadataProducer returns a function to build Predicate Metadata.
// It is used by the scheduler and other components, such as k8s.io/autoscaler/cluster-autoscaler. // It is used by the scheduler and other components, such as k8s.io/autoscaler/cluster-autoscaler.
func (c *Configurator) GetPredicateMetadataProducer() (predicates.PredicateMetadataProducer, error) { func (c *Configurator) GetPredicateMetadataProducer() (predicates.PredicateMetadataProducer, error) {
@ -534,9 +545,7 @@ func (c *Configurator) GetPredicateMetadataProducer() (predicates.PredicateMetad
// Note that the framework executes plugins according to their order in the Plugins list, and so predicates run as plugins // Note that the framework executes plugins according to their order in the Plugins list, and so predicates run as plugins
// are added to the Plugins list according to the order specified in predicates.Ordering(). // are added to the Plugins list according to the order specified in predicates.Ordering().
func (c *Configurator) getPredicateConfigs(predicateKeys sets.String) (map[string]predicates.FitPredicate, *config.Plugins, []config.PluginConfig, error) { func (c *Configurator) getPredicateConfigs(predicateKeys sets.String) (map[string]predicates.FitPredicate, *config.Plugins, []config.PluginConfig, error) {
algorithmArgs, configProducerArgs := c.getAlgorithmArgs() allFitPredicates, err := getFitPredicateFunctions(predicateKeys, *c.factoryArgs)
allFitPredicates, err := getFitPredicateFunctions(predicateKeys, *algorithmArgs)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
@ -563,10 +572,11 @@ func (c *Configurator) getPredicateConfigs(predicateKeys sets.String) (map[strin
// that the corresponding predicates were supposed to run. // that the corresponding predicates were supposed to run.
var plugins config.Plugins var plugins config.Plugins
var pluginConfig []config.PluginConfig var pluginConfig []config.PluginConfig
for _, predicateKey := range predicates.Ordering() { for _, predicateKey := range predicates.Ordering() {
if asPlugins.Has(predicateKey) { if asPlugins.Has(predicateKey) {
producer := frameworkConfigProducers[predicateKey] producer := frameworkConfigProducers[predicateKey]
p, pc := producer(*configProducerArgs) p, pc := producer(*c.configProducerArgs)
plugins.Append(&p) plugins.Append(&p)
pluginConfig = append(pluginConfig, pc...) pluginConfig = append(pluginConfig, pc...)
asPlugins.Delete(predicateKey) asPlugins.Delete(predicateKey)
@ -576,7 +586,7 @@ func (c *Configurator) getPredicateConfigs(predicateKeys sets.String) (map[strin
// Third, add the rest in no specific order. // Third, add the rest in no specific order.
for predicateKey := range asPlugins { for predicateKey := range asPlugins {
producer := frameworkConfigProducers[predicateKey] producer := frameworkConfigProducers[predicateKey]
p, pc := producer(*configProducerArgs) p, pc := producer(*c.configProducerArgs)
plugins.Append(&p) plugins.Append(&p)
pluginConfig = append(pluginConfig, pc...) pluginConfig = append(pluginConfig, pc...)
} }
@ -584,24 +594,6 @@ func (c *Configurator) getPredicateConfigs(predicateKeys sets.String) (map[strin
return asFitPredicates, &plugins, pluginConfig, nil return asFitPredicates, &plugins, pluginConfig, nil
} }
func (c *Configurator) getAlgorithmArgs() (*PluginFactoryArgs, *plugins.ConfigProducerArgs) {
return &PluginFactoryArgs{
NodeInfoLister: c.nodeInfoSnapshot.NodeInfos(),
PodLister: c.nodeInfoSnapshot.Pods(),
ServiceLister: c.serviceLister,
ControllerLister: c.controllerLister,
ReplicaSetLister: c.replicaSetLister,
StatefulSetLister: c.statefulSetLister,
PDBLister: c.pdbLister,
CSINodeLister: c.csiNodeLister,
PVLister: c.pVLister,
PVCLister: c.pVCLister,
StorageClassLister: c.storageClassLister,
VolumeBinder: c.volumeBinder,
HardPodAffinitySymmetricWeight: c.hardPodAffinitySymmetricWeight,
}, &plugins.ConfigProducerArgs{}
}
type podInformer struct { type podInformer struct {
informer cache.SharedIndexInformer informer cache.SharedIndexInformer
} }

View File

@ -18,6 +18,7 @@ package scheduler
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"reflect" "reflect"
@ -46,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1" extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins" frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
@ -102,11 +104,47 @@ func TestCreateFromConfig(t *testing.T) {
t.Errorf("Invalid configuration: %v", err) t.Errorf("Invalid configuration: %v", err)
} }
factory.CreateFromConfig(policy) conf, err := factory.CreateFromConfig(policy)
if err != nil {
t.Fatalf("CreateFromConfig failed: %v", err)
}
hpa := factory.GetHardPodAffinitySymmetricWeight() hpa := factory.GetHardPodAffinitySymmetricWeight()
if hpa != v1.DefaultHardPodAffinitySymmetricWeight { if hpa != v1.DefaultHardPodAffinitySymmetricWeight {
t.Errorf("Wrong hardPodAffinitySymmetricWeight, ecpected: %d, got: %d", v1.DefaultHardPodAffinitySymmetricWeight, hpa) t.Errorf("Wrong hardPodAffinitySymmetricWeight, ecpected: %d, got: %d", v1.DefaultHardPodAffinitySymmetricWeight, hpa)
} }
// Verify that custom predicates are converted to framework plugins.
if !pluginExists(nodelabel.Name, "FilterPlugin", conf) {
t.Error("NodeLabel plugin not exist in framework.")
}
// Verify that the policy config is converted to plugin config for custom predicates.
nodeLabelConfig := findPluginConfig(nodelabel.Name, conf)
encoding, err := json.Marshal(nodeLabelConfig)
if err != nil {
t.Errorf("Failed to marshal %+v: %v", nodeLabelConfig, err)
}
want := `{"Name":"NodeLabel","Args":{"labels":["zone"],"presence":true}}`
if string(encoding) != want {
t.Errorf("Config for NodeLabel plugin mismatch. got: %v, want: %v", string(encoding), want)
}
}
func pluginExists(name, extensionPoint string, schedConf *Config) bool {
for _, pl := range schedConf.Framework.ListPlugins()[extensionPoint] {
if pl.Name == name {
return true
}
}
return false
}
func findPluginConfig(name string, schedConf *Config) config.PluginConfig {
for _, c := range schedConf.PluginConfig {
if c.Name == name {
return c
}
}
return config.PluginConfig{}
} }
func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) { func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) {

View File

@ -12,6 +12,7 @@ go_library(
"//pkg/scheduler/framework/plugins/imagelocality:go_default_library", "//pkg/scheduler/framework/plugins/imagelocality:go_default_library",
"//pkg/scheduler/framework/plugins/interpodaffinity:go_default_library", "//pkg/scheduler/framework/plugins/interpodaffinity:go_default_library",
"//pkg/scheduler/framework/plugins/nodeaffinity:go_default_library", "//pkg/scheduler/framework/plugins/nodeaffinity:go_default_library",
"//pkg/scheduler/framework/plugins/nodelabel:go_default_library",
"//pkg/scheduler/framework/plugins/nodename:go_default_library", "//pkg/scheduler/framework/plugins/nodename:go_default_library",
"//pkg/scheduler/framework/plugins/nodeports:go_default_library", "//pkg/scheduler/framework/plugins/nodeports:go_default_library",
"//pkg/scheduler/framework/plugins/nodepreferavoidpods:go_default_library", "//pkg/scheduler/framework/plugins/nodepreferavoidpods:go_default_library",
@ -26,6 +27,7 @@ go_library(
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library", "//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
], ],
) )
@ -45,6 +47,7 @@ filegroup(
"//pkg/scheduler/framework/plugins/interpodaffinity:all-srcs", "//pkg/scheduler/framework/plugins/interpodaffinity:all-srcs",
"//pkg/scheduler/framework/plugins/migration:all-srcs", "//pkg/scheduler/framework/plugins/migration:all-srcs",
"//pkg/scheduler/framework/plugins/nodeaffinity:all-srcs", "//pkg/scheduler/framework/plugins/nodeaffinity:all-srcs",
"//pkg/scheduler/framework/plugins/nodelabel:all-srcs",
"//pkg/scheduler/framework/plugins/nodename:all-srcs", "//pkg/scheduler/framework/plugins/nodename:all-srcs",
"//pkg/scheduler/framework/plugins/nodeports:all-srcs", "//pkg/scheduler/framework/plugins/nodeports:all-srcs",
"//pkg/scheduler/framework/plugins/nodepreferavoidpods:all-srcs", "//pkg/scheduler/framework/plugins/nodepreferavoidpods:all-srcs",

View File

@ -17,15 +17,18 @@ limitations under the License.
package plugins package plugins
import ( import (
"encoding/json"
"fmt" "fmt"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/imagelocality" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/imagelocality"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodepreferavoidpods" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodepreferavoidpods"
@ -46,9 +49,9 @@ type RegistryArgs struct {
VolumeBinder *volumebinder.VolumeBinder VolumeBinder *volumebinder.VolumeBinder
} }
// NewDefaultRegistry builds a default registry with all the default plugins. // NewDefaultRegistry builds the default registry with all the in-tree plugins.
// This is the registry that Kubernetes default scheduler uses. A scheduler that // This is the registry that Kubernetes default scheduler uses. A scheduler that runs out of tree
// runs custom plugins, can pass a different Registry when initializing the scheduler. // plugins can register additional plugins through the WithFrameworkOutOfTreeRegistry option.
func NewDefaultRegistry(args *RegistryArgs) framework.Registry { func NewDefaultRegistry(args *RegistryArgs) framework.Registry {
return framework.Registry{ return framework.Registry{
imagelocality.Name: imagelocality.New, imagelocality.Name: imagelocality.New,
@ -74,6 +77,7 @@ func NewDefaultRegistry(args *RegistryArgs) framework.Registry {
nodevolumelimits.AzureDiskName: nodevolumelimits.NewAzureDisk, nodevolumelimits.AzureDiskName: nodevolumelimits.NewAzureDisk,
nodevolumelimits.CinderName: nodevolumelimits.NewCinder, nodevolumelimits.CinderName: nodevolumelimits.NewCinder,
interpodaffinity.Name: interpodaffinity.New, interpodaffinity.Name: interpodaffinity.New,
nodelabel.Name: nodelabel.New,
} }
} }
@ -83,6 +87,8 @@ func NewDefaultRegistry(args *RegistryArgs) framework.Registry {
type ConfigProducerArgs struct { type ConfigProducerArgs struct {
// Weight used for priority functions. // Weight used for priority functions.
Weight int32 Weight int32
// NodeLabelArgs is the args for the NodeLabel plugin.
NodeLabelArgs *nodelabel.Args
} }
// ConfigProducer produces a framework's configuration. // ConfigProducer produces a framework's configuration.
@ -191,6 +197,22 @@ func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry {
plugins.Filter = appendToPluginSet(plugins.Filter, podtopologyspread.Name, nil) plugins.Filter = appendToPluginSet(plugins.Filter, podtopologyspread.Name, nil)
return return
}) })
registry.RegisterPredicate(nodelabel.Name,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodelabel.Name, nil)
encoding, err := json.Marshal(args.NodeLabelArgs)
if err != nil {
klog.Fatalf("Failed to marshal %+v", args.NodeLabelArgs)
return
}
config := config.PluginConfig{
Name: nodelabel.Name,
Args: runtime.Unknown{Raw: encoding},
}
pluginConfig = append(pluginConfig, config)
return
})
// Register Priorities. // Register Priorities.
registry.RegisterPriority(priorities.TaintTolerationPriority, registry.RegisterPriority(priorities.TaintTolerationPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {

View File

@ -0,0 +1,43 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["node_label.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["node_label_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,70 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodelabel
import (
"context"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
// Name of this plugin.
const Name = "NodeLabel"
// Args holds the args that are used to configure the plugin.
type Args struct {
// The list of labels that identify node "groups"
// All of the labels should be either present (or absent) for the node to be considered a fit for hosting the pod
Labels []string `json:"labels,omitempty"`
// The boolean flag that indicates whether the labels should be present or absent from the node
Presence bool `json:"presence,omitempty"`
}
// New initializes a new plugin and returns it.
func New(plArgs *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
args := &Args{}
if err := framework.DecodeInto(plArgs, args); err != nil {
return nil, err
}
return &NodeLabel{
predicate: predicates.NewNodeLabelPredicate(args.Labels, args.Presence),
}, nil
}
// NodeLabel checks whether a pod can fit based on the node labels which match a filter that it requests.
type NodeLabel struct {
predicate predicates.FitPredicate
}
var _ framework.FilterPlugin = &NodeLabel{}
// Name returns name of the plugin. It is used in logs, etc.
func (pl *NodeLabel) Name() string {
return Name
}
// Filter invoked at the filter extension point.
func (pl *NodeLabel) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
// Note that NodeLabelPredicate doesn't use predicate metadata, hence passing nil here.
_, reasons, err := pl.predicate(pod, nil, nodeInfo)
return migration.PredicateResultToFrameworkStatus(reasons, err)
}

View File

@ -0,0 +1,88 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodelabel
import (
"context"
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
func TestNodeLabelPresence(t *testing.T) {
label := map[string]string{"foo": "bar", "bar": "foo"}
tests := []struct {
name string
pod *v1.Pod
rawArgs string
res framework.Code
}{
{
name: "label does not match, presence true",
rawArgs: `{"labels" : ["baz"], "presence" : true}`,
res: framework.UnschedulableAndUnresolvable,
},
{
name: "label does not match, presence false",
rawArgs: `{"labels" : ["baz"], "presence" : false}`,
res: framework.Success,
},
{
name: "one label matches, presence true",
rawArgs: `{"labels" : ["foo", "baz"], "presence" : true}`,
res: framework.UnschedulableAndUnresolvable,
},
{
name: "one label matches, presence false",
rawArgs: `{"labels" : ["foo", "baz"], "presence" : false}`,
res: framework.UnschedulableAndUnresolvable,
},
{
name: "all labels match, presence true",
rawArgs: `{"labels" : ["foo", "bar"], "presence" : true}`,
res: framework.Success,
},
{
name: "all labels match, presence false",
rawArgs: `{"labels" : ["foo", "bar"], "presence" : false}`,
res: framework.UnschedulableAndUnresolvable,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: label}}
nodeInfo := schedulernodeinfo.NewNodeInfo()
nodeInfo.SetNode(&node)
args := &runtime.Unknown{Raw: []byte(test.rawArgs)}
p, err := New(args, nil)
if err != nil {
t.Fatalf("Failed to create plugin: %v", err)
}
status := p.(framework.FilterPlugin).Filter(context.TODO(), nil, test.pod, nodeInfo)
if status.Code() != test.res {
t.Errorf("Status mismatch. got: %v, want: %v", status.Code(), test.res)
}
})
}
}

View File

@ -128,14 +128,16 @@ func (sched *Scheduler) Cache() internalcache.Cache {
} }
type schedulerOptions struct { type schedulerOptions struct {
schedulerName string schedulerName string
hardPodAffinitySymmetricWeight int32 hardPodAffinitySymmetricWeight int32
disablePreemption bool disablePreemption bool
percentageOfNodesToScore int32 percentageOfNodesToScore int32
bindTimeoutSeconds int64 bindTimeoutSeconds int64
podInitialBackoffSeconds int64 podInitialBackoffSeconds int64
podMaxBackoffSeconds int64 podMaxBackoffSeconds int64
frameworkDefaultRegistry framework.Registry // Default registry contains all in-tree plugins
frameworkDefaultRegistry framework.Registry
// This registry contains out of tree plugins to be merged with default registry.
frameworkOutOfTreeRegistry framework.Registry frameworkOutOfTreeRegistry framework.Registry
frameworkConfigProducerRegistry *frameworkplugins.ConfigProducerRegistry frameworkConfigProducerRegistry *frameworkplugins.ConfigProducerRegistry
frameworkPlugins *kubeschedulerconfig.Plugins frameworkPlugins *kubeschedulerconfig.Plugins
@ -180,7 +182,7 @@ func WithBindTimeoutSeconds(bindTimeoutSeconds int64) Option {
} }
} }
// WithFrameworkDefaultRegistry sets the framework's default registry. // WithFrameworkDefaultRegistry sets the framework's default registry. This is only used in integration tests.
func WithFrameworkDefaultRegistry(registry framework.Registry) Option { func WithFrameworkDefaultRegistry(registry framework.Registry) Option {
return func(o *schedulerOptions) { return func(o *schedulerOptions) {
o.frameworkDefaultRegistry = registry o.frameworkDefaultRegistry = registry