feat(scheduler): expose SharedInformerFactory to the framework handle

This commit is contained in:
draveness 2019-10-09 14:49:56 +08:00
parent 5e0f48acf8
commit ee4dec65b5
19 changed files with 87 additions and 152 deletions

View File

@ -167,17 +167,8 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, regis
// Create the scheduler. // Create the scheduler.
sched, err := scheduler.New(cc.Client, sched, err := scheduler.New(cc.Client,
cc.InformerFactory.Core().V1().Nodes(), cc.InformerFactory,
cc.PodInformer, cc.PodInformer,
cc.InformerFactory.Core().V1().PersistentVolumes(),
cc.InformerFactory.Core().V1().PersistentVolumeClaims(),
cc.InformerFactory.Core().V1().ReplicationControllers(),
cc.InformerFactory.Apps().V1().ReplicaSets(),
cc.InformerFactory.Apps().V1().StatefulSets(),
cc.InformerFactory.Core().V1().Services(),
cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
cc.InformerFactory.Storage().V1().StorageClasses(),
cc.InformerFactory.Storage().V1beta1().CSINodes(),
cc.Recorder, cc.Recorder,
cc.ComponentConfig.AlgorithmSource, cc.ComponentConfig.AlgorithmSource,
stopCh, stopCh,

View File

@ -30,11 +30,8 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/events:go_default_library", "//staging/src/k8s.io/client-go/tools/events:go_default_library",

View File

@ -1172,17 +1172,8 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
sched, err := scheduler.New( sched, err := scheduler.New(
client, client,
informerFactory.Core().V1().Nodes(), informerFactory,
informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Apps().V1().ReplicaSets(),
informerFactory.Apps().V1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
informerFactory.Storage().V1beta1().CSINodes(),
nil, nil,
algorithmSrc, algorithmSrc,
make(chan struct{}), make(chan struct{}),

View File

@ -27,9 +27,8 @@ import (
storagev1beta1 "k8s.io/api/storage/v1beta1" storagev1beta1 "k8s.io/api/storage/v1beta1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
storageinformersv1 "k8s.io/client-go/informers/storage/v1"
storageinformersv1beta1 "k8s.io/client-go/informers/storage/v1beta1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
) )
@ -381,13 +380,8 @@ func (sched *Scheduler) skipPodUpdate(pod *v1.Pod) bool {
func AddAllEventHandlers( func AddAllEventHandlers(
sched *Scheduler, sched *Scheduler,
schedulerName string, schedulerName string,
nodeInformer coreinformers.NodeInformer, informerFactory informers.SharedInformerFactory,
podInformer coreinformers.PodInformer, podInformer coreinformers.PodInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
serviceInformer coreinformers.ServiceInformer,
storageClassInformer storageinformersv1.StorageClassInformer,
csiNodeInformer storageinformersv1beta1.CSINodeInformer,
) { ) {
// scheduled pod cache // scheduled pod cache
podInformer.Informer().AddEventHandler( podInformer.Informer().AddEventHandler(
@ -440,7 +434,7 @@ func AddAllEventHandlers(
}, },
) )
nodeInformer.Informer().AddEventHandler( informerFactory.Core().V1().Nodes().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: sched.addNodeToCache, AddFunc: sched.addNodeToCache,
UpdateFunc: sched.updateNodeInCache, UpdateFunc: sched.updateNodeInCache,
@ -449,7 +443,7 @@ func AddAllEventHandlers(
) )
if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) { if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
csiNodeInformer.Informer().AddEventHandler( informerFactory.Storage().V1beta1().CSINodes().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: sched.onCSINodeAdd, AddFunc: sched.onCSINodeAdd,
UpdateFunc: sched.onCSINodeUpdate, UpdateFunc: sched.onCSINodeUpdate,
@ -460,7 +454,7 @@ func AddAllEventHandlers(
// On add and delete of PVs, it will affect equivalence cache items // On add and delete of PVs, it will affect equivalence cache items
// related to persistent volume // related to persistent volume
pvInformer.Informer().AddEventHandler( informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
// MaxPDVolumeCountPredicate: since it relies on the counts of PV. // MaxPDVolumeCountPredicate: since it relies on the counts of PV.
AddFunc: sched.onPvAdd, AddFunc: sched.onPvAdd,
@ -469,7 +463,7 @@ func AddAllEventHandlers(
) )
// This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound. // This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
pvcInformer.Informer().AddEventHandler( informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: sched.onPvcAdd, AddFunc: sched.onPvcAdd,
UpdateFunc: sched.onPvcUpdate, UpdateFunc: sched.onPvcUpdate,
@ -479,7 +473,7 @@ func AddAllEventHandlers(
// This is for ServiceAffinity: affected by the selector of the service is updated. // This is for ServiceAffinity: affected by the selector of the service is updated.
// Also, if new service is added, equivalence cache will also become invalid since // Also, if new service is added, equivalence cache will also become invalid since
// existing pods may be "captured" by this service and change this predicate result. // existing pods may be "captured" by this service and change this predicate result.
serviceInformer.Informer().AddEventHandler( informerFactory.Core().V1().Services().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: sched.onServiceAdd, AddFunc: sched.onServiceAdd,
UpdateFunc: sched.onServiceUpdate, UpdateFunc: sched.onServiceUpdate,
@ -487,7 +481,7 @@ func AddAllEventHandlers(
}, },
) )
storageClassInformer.Informer().AddEventHandler( informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: sched.onStorageClassAdd, AddFunc: sched.onStorageClassAdd,
}, },

View File

@ -32,6 +32,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/policy/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/informers/policy/v1beta1:go_default_library",

View File

@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
appsinformers "k8s.io/client-go/informers/apps/v1" appsinformers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
policyinformers "k8s.io/client-go/informers/policy/v1beta1" policyinformers "k8s.io/client-go/informers/policy/v1beta1"
@ -131,6 +132,9 @@ type PodPreemptor interface {
// construct a new scheduler. // construct a new scheduler.
type Configurator struct { type Configurator struct {
client clientset.Interface client clientset.Interface
informerFactory informers.SharedInformerFactory
// a means to list all PersistentVolumes // a means to list all PersistentVolumes
pVLister corelisters.PersistentVolumeLister pVLister corelisters.PersistentVolumeLister
// a means to list all PersistentVolumeClaims // a means to list all PersistentVolumeClaims
@ -196,6 +200,7 @@ type Configurator struct {
// ConfigFactoryArgs is a set arguments passed to NewConfigFactory. // ConfigFactoryArgs is a set arguments passed to NewConfigFactory.
type ConfigFactoryArgs struct { type ConfigFactoryArgs struct {
Client clientset.Interface Client clientset.Interface
InformerFactory informers.SharedInformerFactory
NodeInformer coreinformers.NodeInformer NodeInformer coreinformers.NodeInformer
PodInformer coreinformers.PodInformer PodInformer coreinformers.PodInformer
PvInformer coreinformers.PersistentVolumeInformer PvInformer coreinformers.PersistentVolumeInformer
@ -243,6 +248,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {
c := &Configurator{ c := &Configurator{
client: args.Client, client: args.Client,
informerFactory: args.InformerFactory,
pVLister: args.PvInformer.Lister(), pVLister: args.PvInformer.Lister(),
pVCLister: args.PvcInformer.Lister(), pVCLister: args.PvcInformer.Lister(),
serviceLister: args.ServiceInformer.Lister(), serviceLister: args.ServiceInformer.Lister(),
@ -416,6 +422,7 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
&plugins, &plugins,
pluginConfig, pluginConfig,
framework.WithClientSet(c.client), framework.WithClientSet(c.client),
framework.WithInformerFactory(c.informerFactory),
) )
if err != nil { if err != nil {
klog.Fatalf("error initializing the scheduling framework: %v", err) klog.Fatalf("error initializing the scheduling framework: %v", err)

View File

@ -482,6 +482,7 @@ func newConfigFactoryWithFrameworkRegistry(
informerFactory := informers.NewSharedInformerFactory(client, 0) informerFactory := informers.NewSharedInformerFactory(client, 0)
return NewConfigFactory(&ConfigFactoryArgs{ return NewConfigFactory(&ConfigFactoryArgs{
Client: client, Client: client,
InformerFactory: informerFactory,
NodeInformer: informerFactory.Core().V1().Nodes(), NodeInformer: informerFactory.Core().V1().Nodes(),
PodInformer: informerFactory.Core().V1().Pods(), PodInformer: informerFactory.Core().V1().Pods(),
PvInformer: informerFactory.Core().V1().PersistentVolumes(), PvInformer: informerFactory.Core().V1().PersistentVolumes(),

View File

@ -20,6 +20,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/klog:go_default_library",

View File

@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/klog" "k8s.io/klog"
@ -59,6 +60,7 @@ type framework struct {
permitPlugins []PermitPlugin permitPlugins []PermitPlugin
clientSet clientset.Interface clientSet clientset.Interface
informerFactory informers.SharedInformerFactory
} }
// extensionPoint encapsulates desired and applied set of plugins at a specific extension // extensionPoint encapsulates desired and applied set of plugins at a specific extension
@ -90,6 +92,7 @@ func (f *framework) getExtensionPoints(plugins *config.Plugins) []extensionPoint
type frameworkOptions struct { type frameworkOptions struct {
clientSet clientset.Interface clientSet clientset.Interface
informerFactory informers.SharedInformerFactory
} }
// Option for the framework. // Option for the framework.
@ -102,6 +105,13 @@ func WithClientSet(clientSet clientset.Interface) Option {
} }
} }
// WithInformerFactory sets informer factory for the scheduling framework.
func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option {
return func(o *frameworkOptions) {
o.informerFactory = informerFactory
}
}
var defaultFrameworkOptions = frameworkOptions{} var defaultFrameworkOptions = frameworkOptions{}
var _ = Framework(&framework{}) var _ = Framework(&framework{})
@ -119,6 +129,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
pluginNameToWeightMap: make(map[string]int), pluginNameToWeightMap: make(map[string]int),
waitingPods: newWaitingPodsMap(), waitingPods: newWaitingPodsMap(),
clientSet: options.clientSet, clientSet: options.clientSet,
informerFactory: options.informerFactory,
} }
if plugins == nil { if plugins == nil {
return f, nil return f, nil
@ -578,6 +589,11 @@ func (f *framework) ClientSet() clientset.Interface {
return f.clientSet return f.clientSet
} }
// SharedInformerFactory returns a shared informer factory.
func (f *framework) SharedInformerFactory() informers.SharedInformerFactory {
return f.informerFactory
}
func (f *framework) pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin { func (f *framework) pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin {
pgMap := make(map[string]config.Plugin, 0) pgMap := make(map[string]config.Plugin, 0)

View File

@ -25,6 +25,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
) )
@ -452,4 +453,6 @@ type FrameworkHandle interface {
// ClientSet returns a kubernetes clientSet. // ClientSet returns a kubernetes clientSet.
ClientSet() clientset.Interface ClientSet() clientset.Interface
SharedInformerFactory() informers.SharedInformerFactory
} }

View File

@ -42,6 +42,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library", "//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
], ],

View File

@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/component-base/metrics/testutil" "k8s.io/component-base/metrics/testutil"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
@ -227,6 +228,10 @@ func (*fakeFramework) ClientSet() clientset.Interface {
return nil return nil
} }
func (*fakeFramework) SharedInformerFactory() informers.SharedInformerFactory {
return nil
}
func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
q := NewPriorityQueue(nil, &fakeFramework{}) q := NewPriorityQueue(nil, &fakeFramework{})
if err := q.Add(&medPriorityPod); err != nil { if err := q.Add(&medPriorityPod); err != nil {

View File

@ -28,11 +28,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
appsinformers "k8s.io/client-go/informers/apps/v1" "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
policyinformers "k8s.io/client-go/informers/policy/v1beta1"
storageinformersv1 "k8s.io/client-go/informers/storage/v1"
storageinformersv1beta1 "k8s.io/client-go/informers/storage/v1beta1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
@ -241,17 +238,8 @@ var defaultSchedulerOptions = schedulerOptions{
// New returns a Scheduler // New returns a Scheduler
func New(client clientset.Interface, func New(client clientset.Interface,
nodeInformer coreinformers.NodeInformer, informerFactory informers.SharedInformerFactory,
podInformer coreinformers.PodInformer, podInformer coreinformers.PodInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
replicationControllerInformer coreinformers.ReplicationControllerInformer,
replicaSetInformer appsinformers.ReplicaSetInformer,
statefulSetInformer appsinformers.StatefulSetInformer,
serviceInformer coreinformers.ServiceInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer,
storageClassInformer storageinformersv1.StorageClassInformer,
csiNodeInformer storageinformersv1beta1.CSINodeInformer,
recorder events.EventRecorder, recorder events.EventRecorder,
schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource, schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
stopCh <-chan struct{}, stopCh <-chan struct{},
@ -263,21 +251,27 @@ func New(client clientset.Interface,
} }
schedulerCache := internalcache.New(30*time.Second, stopCh) schedulerCache := internalcache.New(30*time.Second, stopCh)
volumeBinder := volumebinder.NewVolumeBinder(client, nodeInformer, pvcInformer, pvInformer, storageClassInformer, volumeBinder := volumebinder.NewVolumeBinder(
time.Duration(options.bindTimeoutSeconds)*time.Second) client,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1().StorageClasses(),
time.Duration(options.bindTimeoutSeconds)*time.Second,
)
registry := options.frameworkDefaultRegistry registry := options.frameworkDefaultRegistry
if registry == nil { if registry == nil {
registry = frameworkplugins.NewDefaultRegistry(&frameworkplugins.RegistryArgs{ registry = frameworkplugins.NewDefaultRegistry(&frameworkplugins.RegistryArgs{
SchedulerCache: schedulerCache, SchedulerCache: schedulerCache,
ServiceLister: serviceInformer.Lister(), ServiceLister: informerFactory.Core().V1().Services().Lister(),
ControllerLister: replicationControllerInformer.Lister(), ControllerLister: informerFactory.Core().V1().ReplicationControllers().Lister(),
ReplicaSetLister: replicaSetInformer.Lister(), ReplicaSetLister: informerFactory.Apps().V1().ReplicaSets().Lister(),
StatefulSetLister: statefulSetInformer.Lister(), StatefulSetLister: informerFactory.Apps().V1().StatefulSets().Lister(),
PDBLister: pdbInformer.Lister(), PDBLister: informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
PVLister: pvInformer.Lister(), PVLister: informerFactory.Core().V1().PersistentVolumes().Lister(),
PVCLister: pvcInformer.Lister(), PVCLister: informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
StorageClassLister: storageClassInformer.Lister(), StorageClassLister: informerFactory.Storage().V1().StorageClasses().Lister(),
VolumeBinder: volumeBinder, VolumeBinder: volumeBinder,
}) })
} }
@ -286,17 +280,18 @@ func New(client clientset.Interface,
// Set up the configurator which can create schedulers from configs. // Set up the configurator which can create schedulers from configs.
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{ configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
Client: client, Client: client,
NodeInformer: nodeInformer, InformerFactory: informerFactory,
PodInformer: podInformer, PodInformer: podInformer,
PvInformer: pvInformer, NodeInformer: informerFactory.Core().V1().Nodes(),
PvcInformer: pvcInformer, PvInformer: informerFactory.Core().V1().PersistentVolumes(),
ReplicationControllerInformer: replicationControllerInformer, PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
ReplicaSetInformer: replicaSetInformer, ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(),
StatefulSetInformer: statefulSetInformer, ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(),
ServiceInformer: serviceInformer, StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(),
PdbInformer: pdbInformer, ServiceInformer: informerFactory.Core().V1().Services(),
StorageClassInformer: storageClassInformer, PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
CSINodeInformer: csiNodeInformer, StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
CSINodeInformer: informerFactory.Storage().V1beta1().CSINodes(),
VolumeBinder: volumeBinder, VolumeBinder: volumeBinder,
SchedulerCache: schedulerCache, SchedulerCache: schedulerCache,
HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight, HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
@ -349,7 +344,7 @@ func New(client clientset.Interface,
// Create the scheduler. // Create the scheduler.
sched := NewFromConfig(config) sched := NewFromConfig(config)
sched.podConditionUpdater = &podConditionUpdaterImpl{client} sched.podConditionUpdater = &podConditionUpdaterImpl{client}
AddAllEventHandlers(sched, options.schedulerName, nodeInformer, podInformer, pvInformer, pvcInformer, serviceInformer, storageClassInformer, csiNodeInformer) AddAllEventHandlers(sched, options.schedulerName, informerFactory, podInformer)
return sched, nil return sched, nil
} }

View File

@ -183,17 +183,8 @@ func TestSchedulerCreation(t *testing.T) {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
_, err := New(client, _, err := New(client,
informerFactory.Core().V1().Nodes(), informerFactory,
factory.NewPodInformer(client, 0), factory.NewPodInformer(client, 0),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Apps().V1().ReplicaSets(),
informerFactory.Apps().V1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
informerFactory.Storage().V1beta1().CSINodes(),
eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"), eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"),
kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &testSource}, kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &testSource},
stopCh, stopCh,

View File

@ -106,17 +106,8 @@ func setupScheduler(
sched, err := scheduler.New( sched, err := scheduler.New(
cs, cs,
informerFactory.Core().V1().Nodes(), informerFactory,
informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Apps().V1().ReplicaSets(),
informerFactory.Apps().V1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
informerFactory.Storage().V1beta1().CSINodes(),
eventBroadcaster.NewRecorder( eventBroadcaster.NewRecorder(
legacyscheme.Scheme, legacyscheme.Scheme,
v1.DefaultSchedulerName, v1.DefaultSchedulerName,

View File

@ -250,17 +250,8 @@ priorities: []
defaultBindTimeout := int64(30) defaultBindTimeout := int64(30)
sched, err := scheduler.New(clientSet, sched, err := scheduler.New(clientSet,
informerFactory.Core().V1().Nodes(), informerFactory,
factory.NewPodInformer(clientSet, 0), factory.NewPodInformer(clientSet, 0),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Apps().V1().ReplicaSets(),
informerFactory.Apps().V1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
informerFactory.Storage().V1beta1().CSINodes(),
eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.DefaultSchedulerName), eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.DefaultSchedulerName),
kubeschedulerconfig.SchedulerAlgorithmSource{ kubeschedulerconfig.SchedulerAlgorithmSource{
Policy: &kubeschedulerconfig.SchedulerPolicySource{ Policy: &kubeschedulerconfig.SchedulerPolicySource{
@ -322,17 +313,8 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) {
defaultBindTimeout := int64(30) defaultBindTimeout := int64(30)
_, err := scheduler.New(clientSet, _, err := scheduler.New(clientSet,
informerFactory.Core().V1().Nodes(), informerFactory,
factory.NewPodInformer(clientSet, 0), factory.NewPodInformer(clientSet, 0),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Apps().V1().ReplicaSets(),
informerFactory.Apps().V1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
informerFactory.Storage().V1beta1().CSINodes(),
eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.DefaultSchedulerName), eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.DefaultSchedulerName),
kubeschedulerconfig.SchedulerAlgorithmSource{ kubeschedulerconfig.SchedulerAlgorithmSource{
Policy: &kubeschedulerconfig.SchedulerPolicySource{ Policy: &kubeschedulerconfig.SchedulerPolicySource{

View File

@ -23,7 +23,7 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1" policy "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
@ -184,17 +184,8 @@ func initTestSchedulerWithOptions(
opts = append([]scheduler.Option{scheduler.WithBindTimeoutSeconds(600)}, opts...) opts = append([]scheduler.Option{scheduler.WithBindTimeoutSeconds(600)}, opts...)
context.scheduler, err = scheduler.New( context.scheduler, err = scheduler.New(
context.clientSet, context.clientSet,
context.informerFactory.Core().V1().Nodes(), context.informerFactory,
podInformer, podInformer,
context.informerFactory.Core().V1().PersistentVolumes(),
context.informerFactory.Core().V1().PersistentVolumeClaims(),
context.informerFactory.Core().V1().ReplicationControllers(),
context.informerFactory.Apps().V1().ReplicaSets(),
context.informerFactory.Apps().V1().StatefulSets(),
context.informerFactory.Core().V1().Services(),
context.informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
context.informerFactory.Storage().V1().StorageClasses(),
context.informerFactory.Storage().V1beta1().CSINodes(),
recorder, recorder,
algorithmSrc, algorithmSrc,
context.stopCh, context.stopCh,

View File

@ -76,13 +76,8 @@ func StartScheduler(clientSet clientset.Interface) (*scheduler.Scheduler, Shutdo
} }
scheduler.AddAllEventHandlers(sched, scheduler.AddAllEventHandlers(sched,
v1.DefaultSchedulerName, v1.DefaultSchedulerName,
informerFactory.Core().V1().Nodes(), informerFactory,
informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().Services(),
informerFactory.Storage().V1().StorageClasses(),
informerFactory.Storage().V1beta1().CSINodes(),
) )
informerFactory.Start(stopCh) informerFactory.Start(stopCh)
@ -107,17 +102,8 @@ func createScheduler(
return scheduler.New( return scheduler.New(
clientSet, clientSet,
informerFactory.Core().V1().Nodes(), informerFactory,
informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Apps().V1().ReplicaSets(),
informerFactory.Apps().V1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
informerFactory.Storage().V1beta1().CSINodes(),
recorder, recorder,
schedulerconfig.SchedulerAlgorithmSource{ schedulerconfig.SchedulerAlgorithmSource{
Provider: &defaultProviderName, Provider: &defaultProviderName,

View File

@ -142,17 +142,8 @@ func createSchedulerWithPodInformer(
return scheduler.New( return scheduler.New(
clientSet, clientSet,
informerFactory.Core().V1().Nodes(), informerFactory,
podInformer, podInformer,
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Apps().V1().ReplicaSets(),
informerFactory.Apps().V1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
informerFactory.Storage().V1beta1().CSINodes(),
recorder, recorder,
schedulerconfig.SchedulerAlgorithmSource{ schedulerconfig.SchedulerAlgorithmSource{
Provider: &defaultProviderName, Provider: &defaultProviderName,