made scheduler cache and volume binder available when instantiating factories for default plugins

This commit is contained in:
Abdullah Gharaibeh 2019-10-10 16:21:29 -04:00
parent 0a98ccbcaf
commit 456df97745
11 changed files with 119 additions and 58 deletions

View File

@ -16,7 +16,6 @@ go_library(
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/plugins:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/util/configz:go_default_library",

View File

@ -51,7 +51,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
plugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/util/configz"
@ -159,9 +158,9 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, regis
// To help debugging, immediately log version
klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())
registry := plugins.NewDefaultRegistry()
outOfTreeRegistry := make(framework.Registry)
for _, option := range registryOptions {
if err := option(registry); err != nil {
if err := option(outOfTreeRegistry); err != nil {
return err
}
}
@ -187,7 +186,7 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, regis
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds),
scheduler.WithFrameworkRegistry(registry),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithFrameworkPlugins(cc.ComponentConfig.Plugins),
scheduler.WithFrameworkPluginConfig(cc.ComponentConfig.PluginConfig),
scheduler.WithPodMaxBackoffSeconds(*cc.ComponentConfig.PodMaxBackoffSeconds),

View File

@ -207,6 +207,8 @@ type ConfigFactoryArgs struct {
PdbInformer policyinformers.PodDisruptionBudgetInformer
StorageClassInformer storageinformersv1.StorageClassInformer
CSINodeInformer storageinformersv1beta1.CSINodeInformer
VolumeBinder *volumebinder.VolumeBinder
SchedulerCache internalcache.Cache
HardPodAffinitySymmetricWeight int32
DisablePreemption bool
PercentageOfNodesToScore int32
@ -227,7 +229,6 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {
if stopEverything == nil {
stopEverything = wait.NeverStop
}
schedulerCache := internalcache.New(30*time.Second, stopEverything)
// storageClassInformer is only enabled through VolumeScheduling feature gate
var storageClassLister storagelistersv1.StorageClassLister
@ -253,7 +254,8 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {
podLister: args.PodInformer.Lister(),
storageClassLister: storageClassLister,
csiNodeLister: csiNodeLister,
schedulerCache: schedulerCache,
volumeBinder: args.VolumeBinder,
schedulerCache: args.SchedulerCache,
StopEverything: stopEverything,
hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
disablePreemption: args.DisablePreemption,
@ -267,8 +269,6 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {
pluginConfig: args.PluginConfig,
pluginConfigProducerRegistry: args.PluginConfigProducerRegistry,
}
// Setup volume binder
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
return c

View File

@ -480,36 +480,36 @@ func newConfigFactoryWithFrameworkRegistry(
registry framework.Registry, pluginConfigProducerRegistry *frameworkplugins.ConfigProducerRegistry) *Configurator {
informerFactory := informers.NewSharedInformerFactory(client, 0)
return NewConfigFactory(&ConfigFactoryArgs{
client,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Apps().V1().ReplicaSets(),
informerFactory.Apps().V1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
informerFactory.Storage().V1beta1().CSINodes(),
hardPodAffinitySymmetricWeight,
disablePodPreemption,
schedulerapi.DefaultPercentageOfNodesToScore,
bindTimeoutSeconds,
podMaxBackoffDurationSeconds,
podInitialBackoffDurationSeconds,
stopCh,
registry,
nil,
[]config.PluginConfig{},
pluginConfigProducerRegistry,
Client: client,
NodeInformer: informerFactory.Core().V1().Nodes(),
PodInformer: informerFactory.Core().V1().Pods(),
PvInformer: informerFactory.Core().V1().PersistentVolumes(),
PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(),
ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(),
StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(),
ServiceInformer: informerFactory.Core().V1().Services(),
PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
CSINodeInformer: informerFactory.Storage().V1beta1().CSINodes(),
HardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight,
DisablePreemption: disablePodPreemption,
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
BindTimeoutSeconds: bindTimeoutSeconds,
PodInitialBackoffSeconds: podInitialBackoffDurationSeconds,
PodMaxBackoffSeconds: podMaxBackoffDurationSeconds,
StopCh: stopCh,
Registry: registry,
Plugins: nil,
PluginConfig: []config.PluginConfig{},
PluginConfigProducerRegistry: pluginConfigProducerRegistry,
})
}
func newConfigFactory(
client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{}) *Configurator {
return newConfigFactoryWithFrameworkRegistry(client, hardPodAffinitySymmetricWeight, stopCh,
frameworkplugins.NewDefaultRegistry(), frameworkplugins.NewDefaultConfigProducerRegistry())
frameworkplugins.NewDefaultRegistry(&frameworkplugins.RegistryArgs{}), frameworkplugins.NewDefaultConfigProducerRegistry())
}
type fakeExtender struct {

View File

@ -6,10 +6,15 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/plugins/tainttoleration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
],
)

View File

@ -19,16 +19,35 @@ package plugins
import (
"fmt"
corelisters "k8s.io/client-go/listers/core/v1"
storagelistersv1 "k8s.io/client-go/listers/storage/v1"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
// RegistryArgs arguments needed to create default plugin factories.
type RegistryArgs struct {
SchedulerCache internalcache.Cache
ServiceLister algorithm.ServiceLister
ControllerLister algorithm.ControllerLister
ReplicaSetLister algorithm.ReplicaSetLister
StatefulSetLister algorithm.StatefulSetLister
PDBLister algorithm.PDBLister
PVLister corelisters.PersistentVolumeLister
PVCLister corelisters.PersistentVolumeClaimLister
StorageClassLister storagelistersv1.StorageClassLister
VolumeBinder *volumebinder.VolumeBinder
}
// NewDefaultRegistry builds a default registry with all the default plugins.
// This is the registry that Kubernetes default scheduler uses. A scheduler that
// runs custom plugins, can pass a different Registry when initializing the scheduler.
func NewDefaultRegistry() framework.Registry {
func NewDefaultRegistry(args *RegistryArgs) framework.Registry {
return framework.Registry{
tainttoleration.Name: tainttoleration.New,
}

View File

@ -68,3 +68,13 @@ func (r Registry) Unregister(name string) error {
delete(r, name)
return nil
}
// Merge merges the provided registry to the current one.
func (r Registry) Merge(in Registry) error {
for name, factory := range in {
if err := r.Register(name, factory); err != nil {
return err
}
}
return nil
}

View File

@ -124,7 +124,8 @@ type schedulerOptions struct {
bindTimeoutSeconds int64
podInitialBackoffSeconds int64
podMaxBackoffSeconds int64
frameworkRegistry framework.Registry
frameworkDefaultRegistry framework.Registry
frameworkOutOfTreeRegistry framework.Registry
frameworkConfigProducerRegistry *frameworkplugins.ConfigProducerRegistry
frameworkPlugins *kubeschedulerconfig.Plugins
frameworkPluginConfig []kubeschedulerconfig.PluginConfig
@ -168,10 +169,18 @@ func WithBindTimeoutSeconds(bindTimeoutSeconds int64) Option {
}
}
// WithFrameworkRegistry sets the framework registry.
func WithFrameworkRegistry(registry framework.Registry) Option {
// WithFrameworkDefaultRegistry sets the framework's default registry.
func WithFrameworkDefaultRegistry(registry framework.Registry) Option {
return func(o *schedulerOptions) {
o.frameworkRegistry = registry
o.frameworkDefaultRegistry = registry
}
}
// WithFrameworkOutOfTreeRegistry sets the registry for out-of-tree plugins. Those plugins
// will be appended to the default registry.
func WithFrameworkOutOfTreeRegistry(registry framework.Registry) Option {
return func(o *schedulerOptions) {
o.frameworkOutOfTreeRegistry = registry
}
}
@ -218,7 +227,6 @@ var defaultSchedulerOptions = schedulerOptions{
bindTimeoutSeconds: BindTimeoutSeconds,
podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
frameworkRegistry: frameworkplugins.NewDefaultRegistry(),
frameworkConfigProducerRegistry: frameworkplugins.NewDefaultConfigProducerRegistry(),
// The plugins and pluginConfig options are currently nil because we currently don't have
// "default" plugins. All plugins that we run through the framework currently come from two
@ -253,6 +261,28 @@ func New(client clientset.Interface,
for _, opt := range opts {
opt(&options)
}
schedulerCache := internalcache.New(30*time.Second, stopCh)
volumeBinder := volumebinder.NewVolumeBinder(client, nodeInformer, pvcInformer, pvInformer, storageClassInformer,
time.Duration(options.bindTimeoutSeconds)*time.Second)
registry := options.frameworkDefaultRegistry
if registry == nil {
registry = frameworkplugins.NewDefaultRegistry(&frameworkplugins.RegistryArgs{
SchedulerCache: schedulerCache,
ServiceLister: serviceInformer.Lister(),
ControllerLister: replicationControllerInformer.Lister(),
ReplicaSetLister: replicaSetInformer.Lister(),
StatefulSetLister: statefulSetInformer.Lister(),
PDBLister: pdbInformer.Lister(),
PVLister: pvInformer.Lister(),
PVCLister: pvcInformer.Lister(),
StorageClassLister: storageClassInformer.Lister(),
VolumeBinder: volumeBinder,
})
}
registry.Merge(options.frameworkOutOfTreeRegistry)
// Set up the configurator which can create schedulers from configs.
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
Client: client,
@ -267,13 +297,15 @@ func New(client clientset.Interface,
PdbInformer: pdbInformer,
StorageClassInformer: storageClassInformer,
CSINodeInformer: csiNodeInformer,
VolumeBinder: volumeBinder,
SchedulerCache: schedulerCache,
HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
DisablePreemption: options.disablePreemption,
PercentageOfNodesToScore: options.percentageOfNodesToScore,
BindTimeoutSeconds: options.bindTimeoutSeconds,
PodInitialBackoffSeconds: options.podInitialBackoffSeconds,
PodMaxBackoffSeconds: options.podMaxBackoffSeconds,
Registry: options.frameworkRegistry,
Registry: registry,
PluginConfigProducerRegistry: options.frameworkConfigProducerRegistry,
Plugins: options.frameworkPlugins,
PluginConfig: options.frameworkPluginConfig,

View File

@ -34,7 +34,6 @@ go_test(
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/apis/extender/v1:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/framework/plugins:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/testing:go_default_library",

View File

@ -461,7 +461,7 @@ func TestPreFilterPlugin(t *testing.T) {
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "prefilter-plugin", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithFrameworkRegistry(registry))
scheduler.WithFrameworkDefaultRegistry(registry))
defer cleanupTest(t, context)
tests := []struct {
@ -532,7 +532,7 @@ func TestScorePlugin(t *testing.T) {
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "score-plugin", nil), 10,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithFrameworkRegistry(registry))
scheduler.WithFrameworkDefaultRegistry(registry))
defer cleanupTest(t, context)
for i, fail := range []bool{false, true} {
@ -590,7 +590,7 @@ func TestNormalizeScorePlugin(t *testing.T) {
}
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "score-plugin", nil), 10,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithFrameworkRegistry(registry))
scheduler.WithFrameworkDefaultRegistry(registry))
defer cleanupTest(t, context)
@ -635,7 +635,7 @@ func TestReservePlugin(t *testing.T) {
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "reserve-plugin", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithFrameworkRegistry(registry))
scheduler.WithFrameworkDefaultRegistry(registry))
defer cleanupTest(t, context)
for _, fail := range []bool{false, true} {
@ -694,7 +694,7 @@ func TestPrebindPlugin(t *testing.T) {
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "prebind-plugin", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithFrameworkPluginConfig(preBindPluginConfig),
scheduler.WithFrameworkRegistry(registry))
scheduler.WithFrameworkDefaultRegistry(registry))
defer cleanupTest(t, context)
tests := []struct {
@ -789,7 +789,7 @@ func TestUnreservePlugin(t *testing.T) {
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "unreserve-plugin", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithFrameworkPluginConfig(pluginConfig),
scheduler.WithFrameworkRegistry(registry))
scheduler.WithFrameworkDefaultRegistry(registry))
defer cleanupTest(t, context)
tests := []struct {
@ -899,7 +899,7 @@ func TestBindPlugin(t *testing.T) {
context := initTestSchedulerWithOptions(t, testContext, false, nil, time.Second,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithFrameworkPluginConfig(pluginConfig),
scheduler.WithFrameworkRegistry(registry),
scheduler.WithFrameworkDefaultRegistry(registry),
scheduler.WithFrameworkConfigProducerRegistry(nil))
defer cleanupTest(t, context)
@ -1072,7 +1072,7 @@ func TestPostBindPlugin(t *testing.T) {
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "postbind-plugin", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithFrameworkPluginConfig(pluginConfig),
scheduler.WithFrameworkRegistry(registry))
scheduler.WithFrameworkDefaultRegistry(registry))
defer cleanupTest(t, context)
tests := []struct {
@ -1147,7 +1147,7 @@ func TestPermitPlugin(t *testing.T) {
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithFrameworkPluginConfig(pluginConfig),
scheduler.WithFrameworkRegistry(registry))
scheduler.WithFrameworkDefaultRegistry(registry))
defer cleanupTest(t, context)
tests := []struct {
@ -1253,7 +1253,7 @@ func TestCoSchedulingWithPermitPlugin(t *testing.T) {
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithFrameworkPluginConfig(pluginConfig),
scheduler.WithFrameworkRegistry(registry))
scheduler.WithFrameworkDefaultRegistry(registry))
defer cleanupTest(t, context)
tests := []struct {
@ -1334,7 +1334,7 @@ func TestFilterPlugin(t *testing.T) {
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "filter-plugin", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithFrameworkRegistry(registry))
scheduler.WithFrameworkDefaultRegistry(registry))
defer cleanupTest(t, context)
for _, fail := range []bool{false, true} {
@ -1385,7 +1385,7 @@ func TestPostFilterPlugin(t *testing.T) {
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "post-filter-plugin", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithFrameworkRegistry(registry))
scheduler.WithFrameworkDefaultRegistry(registry))
defer cleanupTest(t, context)
for _, fail := range []bool{false, true} {
@ -1444,7 +1444,7 @@ func TestPreemptWithPermitPlugin(t *testing.T) {
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "preempt-with-permit-plugin", nil), 0,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithFrameworkPluginConfig(pluginConfig),
scheduler.WithFrameworkRegistry(registry))
scheduler.WithFrameworkDefaultRegistry(registry))
defer cleanupTest(t, context)
// Add one node.

View File

@ -40,7 +40,6 @@ import (
"k8s.io/kubernetes/pkg/apis/scheduling"
"k8s.io/kubernetes/pkg/scheduler"
schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/plugin/pkg/admission/priority"
@ -124,8 +123,7 @@ var _ = framework.FilterPlugin(&tokenFilter{})
func TestPreemption(t *testing.T) {
// Initialize scheduler with a filter plugin.
var filter tokenFilter
registry := frameworkplugins.NewDefaultRegistry()
registry := make(framework.Registry)
registry.Register(filterPluginName, func(_ *runtime.Unknown, fh framework.FrameworkHandle) (framework.Plugin, error) {
return &filter, nil
})
@ -149,7 +147,7 @@ func TestPreemption(t *testing.T) {
initTestMaster(t, "preemptiom", nil),
false, nil, time.Second,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithFrameworkRegistry(registry))
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer cleanupTest(t, context)
cs := context.clientSet