Merge pull request #85150 from ahg-g/ahg-informer

Pass InformerFactory/SharedLister instead of individual informers/listers in scheduler config logic
This commit is contained in:
Kubernetes Prow Robot 2019-11-13 09:28:23 -08:00 committed by GitHub
commit e0c483b889
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 141 additions and 239 deletions

View File

@ -44,12 +44,8 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
@ -71,6 +67,7 @@ go_test(
deps = [
"//pkg/api/testing:go_default_library",
"//pkg/controller/volume/scheduling:go_default_library",
"//pkg/features:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
@ -99,6 +96,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",

View File

@ -25,10 +25,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
appslisters "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
policylisters "k8s.io/client-go/listers/policy/v1beta1"
storagelisters "k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/informers"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -45,17 +42,8 @@ import (
// PluginFactoryArgs are passed to all plugin factory functions.
type PluginFactoryArgs struct {
NodeInfoLister schedulerlisters.NodeInfoLister
PodLister schedulerlisters.PodLister
ServiceLister corelisters.ServiceLister
ControllerLister corelisters.ReplicationControllerLister
ReplicaSetLister appslisters.ReplicaSetLister
StatefulSetLister appslisters.StatefulSetLister
PDBLister policylisters.PodDisruptionBudgetLister
CSINodeLister storagelisters.CSINodeLister
PVLister corelisters.PersistentVolumeLister
PVCLister corelisters.PersistentVolumeClaimLister
StorageClassLister storagelisters.StorageClassLister
SharedLister schedulerlisters.SharedLister
InformerFactory informers.SharedInformerFactory
VolumeBinder *volumebinder.VolumeBinder
HardPodAffinitySymmetricWeight int32
}
@ -280,9 +268,9 @@ func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy, pluginArgs
predicateFactory = func(args PluginFactoryArgs) predicates.FitPredicate {
predicate, precomputationFunction := predicates.NewServiceAffinityPredicate(
args.NodeInfoLister,
args.PodLister,
args.ServiceLister,
args.SharedLister.NodeInfos(),
args.SharedLister.Pods(),
args.InformerFactory.Core().V1().Services().Lister(),
pluginArgs.ServiceAffinityArgs.AffinityLabels,
)
@ -404,8 +392,8 @@ func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy, configPr
pcf = &PriorityConfigFactory{
MapReduceFunction: func(args PluginFactoryArgs) (priorities.PriorityMapFunction, priorities.PriorityReduceFunction) {
return priorities.NewServiceAntiAffinityPriority(
args.PodLister,
args.ServiceLister,
args.SharedLister.Pods(),
args.InformerFactory.Core().V1().Services().Lister(),
configProducerArgs.ServiceAffinityArgs.AntiAffinityLabelsPreference,
)
},

View File

@ -55,40 +55,63 @@ func init() {
scheduler.RegisterFitPredicateFactory(
predicates.NoVolumeZoneConflictPred,
func(args scheduler.PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewVolumeZonePredicate(args.PVLister, args.PVCLister, args.StorageClassLister)
pvLister := args.InformerFactory.Core().V1().PersistentVolumes().Lister()
pvcLister := args.InformerFactory.Core().V1().PersistentVolumeClaims().Lister()
storageClassLister := args.InformerFactory.Storage().V1().StorageClasses().Lister()
return predicates.NewVolumeZonePredicate(pvLister, pvcLister, storageClassLister)
},
)
// Fit is determined by whether or not there would be too many AWS EBS volumes attached to the node
scheduler.RegisterFitPredicateFactory(
predicates.MaxEBSVolumeCountPred,
func(args scheduler.PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilterType, args.CSINodeLister, args.StorageClassLister, args.PVLister, args.PVCLister)
csiNodeLister := scheduler.GetCSINodeLister(args.InformerFactory)
pvLister := args.InformerFactory.Core().V1().PersistentVolumes().Lister()
pvcLister := args.InformerFactory.Core().V1().PersistentVolumeClaims().Lister()
storageClassLister := args.InformerFactory.Storage().V1().StorageClasses().Lister()
return predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilterType, csiNodeLister, storageClassLister, pvLister, pvcLister)
},
)
// Fit is determined by whether or not there would be too many GCE PD volumes attached to the node
scheduler.RegisterFitPredicateFactory(
predicates.MaxGCEPDVolumeCountPred,
func(args scheduler.PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilterType, args.CSINodeLister, args.StorageClassLister, args.PVLister, args.PVCLister)
csiNodeLister := scheduler.GetCSINodeLister(args.InformerFactory)
pvLister := args.InformerFactory.Core().V1().PersistentVolumes().Lister()
pvcLister := args.InformerFactory.Core().V1().PersistentVolumeClaims().Lister()
storageClassLister := args.InformerFactory.Storage().V1().StorageClasses().Lister()
return predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilterType, csiNodeLister, storageClassLister, pvLister, pvcLister)
},
)
// Fit is determined by whether or not there would be too many Azure Disk volumes attached to the node
scheduler.RegisterFitPredicateFactory(
predicates.MaxAzureDiskVolumeCountPred,
func(args scheduler.PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilterType, args.CSINodeLister, args.StorageClassLister, args.PVLister, args.PVCLister)
csiNodeLister := scheduler.GetCSINodeLister(args.InformerFactory)
pvLister := args.InformerFactory.Core().V1().PersistentVolumes().Lister()
pvcLister := args.InformerFactory.Core().V1().PersistentVolumeClaims().Lister()
storageClassLister := args.InformerFactory.Storage().V1().StorageClasses().Lister()
return predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilterType, csiNodeLister, storageClassLister, pvLister, pvcLister)
},
)
scheduler.RegisterFitPredicateFactory(
predicates.MaxCSIVolumeCountPred,
func(args scheduler.PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewCSIMaxVolumeLimitPredicate(args.CSINodeLister, args.PVLister, args.PVCLister, args.StorageClassLister)
csiNodeLister := scheduler.GetCSINodeLister(args.InformerFactory)
pvLister := args.InformerFactory.Core().V1().PersistentVolumes().Lister()
pvcLister := args.InformerFactory.Core().V1().PersistentVolumeClaims().Lister()
storageClassLister := args.InformerFactory.Storage().V1().StorageClasses().Lister()
return predicates.NewCSIMaxVolumeLimitPredicate(csiNodeLister, pvLister, pvcLister, storageClassLister)
},
)
scheduler.RegisterFitPredicateFactory(
predicates.MaxCinderVolumeCountPred,
func(args scheduler.PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.CinderVolumeFilterType, args.CSINodeLister, args.StorageClassLister, args.PVLister, args.PVCLister)
csiNodeLister := scheduler.GetCSINodeLister(args.InformerFactory)
pvLister := args.InformerFactory.Core().V1().PersistentVolumes().Lister()
pvcLister := args.InformerFactory.Core().V1().PersistentVolumeClaims().Lister()
storageClassLister := args.InformerFactory.Storage().V1().StorageClasses().Lister()
return predicates.NewMaxPDVolumeCountPredicate(predicates.CinderVolumeFilterType, csiNodeLister, storageClassLister, pvLister, pvcLister)
},
)
@ -96,7 +119,7 @@ func init() {
scheduler.RegisterFitPredicateFactory(
predicates.MatchInterPodAffinityPred,
func(args scheduler.PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewPodAffinityPredicate(args.NodeInfoLister, args.PodLister)
return predicates.NewPodAffinityPredicate(args.SharedLister.NodeInfos(), args.SharedLister.Pods())
},
)

View File

@ -26,7 +26,11 @@ func init() {
// Register functions that extract metadata used by priorities computations.
scheduler.RegisterPriorityMetadataProducerFactory(
func(args scheduler.PluginFactoryArgs) priorities.MetadataProducer {
return priorities.NewMetadataFactory(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister, args.HardPodAffinitySymmetricWeight)
serviceLister := args.InformerFactory.Core().V1().Services().Lister()
controllerLister := args.InformerFactory.Core().V1().ReplicationControllers().Lister()
replicaSetLister := args.InformerFactory.Apps().V1().ReplicaSets().Lister()
statefulSetLister := args.InformerFactory.Apps().V1().StatefulSets().Lister()
return priorities.NewMetadataFactory(serviceLister, controllerLister, replicaSetLister, statefulSetLister, args.HardPodAffinitySymmetricWeight)
})
// ServiceSpreadingPriority is a priority config factory that spreads pods by minimizing
@ -37,7 +41,8 @@ func init() {
priorities.ServiceSpreadingPriority,
scheduler.PriorityConfigFactory{
MapReduceFunction: func(args scheduler.PluginFactoryArgs) (priorities.PriorityMapFunction, priorities.PriorityReduceFunction) {
return priorities.NewSelectorSpreadPriority(args.ServiceLister, algorithm.EmptyControllerLister{}, algorithm.EmptyReplicaSetLister{}, algorithm.EmptyStatefulSetLister{})
serviceLister := args.InformerFactory.Core().V1().Services().Lister()
return priorities.NewSelectorSpreadPriority(serviceLister, algorithm.EmptyControllerLister{}, algorithm.EmptyReplicaSetLister{}, algorithm.EmptyStatefulSetLister{})
},
Weight: 1,
},
@ -54,7 +59,11 @@ func init() {
priorities.SelectorSpreadPriority,
scheduler.PriorityConfigFactory{
MapReduceFunction: func(args scheduler.PluginFactoryArgs) (priorities.PriorityMapFunction, priorities.PriorityReduceFunction) {
return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
serviceLister := args.InformerFactory.Core().V1().Services().Lister()
controllerLister := args.InformerFactory.Core().V1().ReplicationControllers().Lister()
replicaSetLister := args.InformerFactory.Apps().V1().ReplicaSets().Lister()
statefulSetLister := args.InformerFactory.Apps().V1().StatefulSets().Lister()
return priorities.NewSelectorSpreadPriority(serviceLister, controllerLister, replicaSetLister, statefulSetLister)
},
Weight: 1,
},

View File

@ -27,21 +27,16 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
appsinformers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
policyinformers "k8s.io/client-go/informers/policy/v1beta1"
storageinformersv1 "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes"
appslisters "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
policylisters "k8s.io/client-go/listers/policy/v1beta1"
storagelistersv1 "k8s.io/client-go/listers/storage/v1"
storagelisters "k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
@ -74,28 +69,7 @@ type Configurator struct {
informerFactory informers.SharedInformerFactory
// a means to list all PersistentVolumes
pVLister corelisters.PersistentVolumeLister
// a means to list all PersistentVolumeClaims
pVCLister corelisters.PersistentVolumeClaimLister
// a means to list all services
serviceLister corelisters.ServiceLister
// a means to list all controllers
controllerLister corelisters.ReplicationControllerLister
// a means to list all replicasets
replicaSetLister appslisters.ReplicaSetLister
// a means to list all statefulsets
statefulSetLister appslisters.StatefulSetLister
// a means to list all PodDisruptionBudgets
pdbLister policylisters.PodDisruptionBudgetLister
// a means to list all StorageClasses
storageClassLister storagelistersv1.StorageClassLister
// a means to list all CSINodes
csiNodeLister storagelistersv1.CSINodeLister
// a means to list all Nodes
nodeLister corelisters.NodeLister
// a means to list all Pods
podLister corelisters.PodLister
podInformer coreinformers.PodInformer
// Close this to stop all reflectors
StopEverything <-chan struct{}
@ -138,110 +112,6 @@ type Configurator struct {
configProducerArgs *plugins.ConfigProducerArgs
}
// ConfigFactoryArgs is a set arguments passed to NewConfigFactory.
type ConfigFactoryArgs struct {
Client clientset.Interface
InformerFactory informers.SharedInformerFactory
NodeInformer coreinformers.NodeInformer
PodInformer coreinformers.PodInformer
PvInformer coreinformers.PersistentVolumeInformer
PvcInformer coreinformers.PersistentVolumeClaimInformer
ReplicationControllerInformer coreinformers.ReplicationControllerInformer
ReplicaSetInformer appsinformers.ReplicaSetInformer
StatefulSetInformer appsinformers.StatefulSetInformer
ServiceInformer coreinformers.ServiceInformer
PdbInformer policyinformers.PodDisruptionBudgetInformer
StorageClassInformer storageinformersv1.StorageClassInformer
CSINodeInformer storageinformersv1.CSINodeInformer
VolumeBinder *volumebinder.VolumeBinder
SchedulerCache internalcache.Cache
HardPodAffinitySymmetricWeight int32
DisablePreemption bool
PercentageOfNodesToScore int32
BindTimeoutSeconds int64
PodInitialBackoffSeconds int64
PodMaxBackoffSeconds int64
StopCh <-chan struct{}
Registry framework.Registry
Plugins *schedulerapi.Plugins
PluginConfig []schedulerapi.PluginConfig
PluginConfigProducerRegistry *plugins.ConfigProducerRegistry
}
// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
// return the interface.
func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {
stopEverything := args.StopCh
if stopEverything == nil {
stopEverything = wait.NeverStop
}
// storageClassInformer is only enabled through VolumeScheduling feature gate
var storageClassLister storagelistersv1.StorageClassLister
if args.StorageClassInformer != nil {
storageClassLister = args.StorageClassInformer.Lister()
}
var csiNodeLister storagelistersv1.CSINodeLister
if args.CSINodeInformer != nil {
csiNodeLister = args.CSINodeInformer.Lister()
}
var pdbLister policylisters.PodDisruptionBudgetLister
if args.PdbInformer != nil {
pdbLister = args.PdbInformer.Lister()
}
c := &Configurator{
client: args.Client,
informerFactory: args.InformerFactory,
pVLister: args.PvInformer.Lister(),
pVCLister: args.PvcInformer.Lister(),
serviceLister: args.ServiceInformer.Lister(),
controllerLister: args.ReplicationControllerInformer.Lister(),
replicaSetLister: args.ReplicaSetInformer.Lister(),
statefulSetLister: args.StatefulSetInformer.Lister(),
pdbLister: pdbLister,
nodeLister: args.NodeInformer.Lister(),
podLister: args.PodInformer.Lister(),
storageClassLister: storageClassLister,
csiNodeLister: csiNodeLister,
volumeBinder: args.VolumeBinder,
schedulerCache: args.SchedulerCache,
StopEverything: stopEverything,
hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
disablePreemption: args.DisablePreemption,
percentageOfNodesToScore: args.PercentageOfNodesToScore,
bindTimeoutSeconds: args.BindTimeoutSeconds,
podInitialBackoffSeconds: args.PodInitialBackoffSeconds,
podMaxBackoffSeconds: args.PodMaxBackoffSeconds,
enableNonPreempting: utilfeature.DefaultFeatureGate.Enabled(features.NonPreemptingPriority),
registry: args.Registry,
plugins: args.Plugins,
pluginConfig: args.PluginConfig,
pluginConfigProducerRegistry: args.PluginConfigProducerRegistry,
nodeInfoSnapshot: nodeinfosnapshot.NewEmptySnapshot(),
}
c.factoryArgs = PluginFactoryArgs{
NodeInfoLister: c.nodeInfoSnapshot.NodeInfos(),
PodLister: c.nodeInfoSnapshot.Pods(),
ServiceLister: c.serviceLister,
ControllerLister: c.controllerLister,
ReplicaSetLister: c.replicaSetLister,
StatefulSetLister: c.statefulSetLister,
PDBLister: c.pdbLister,
CSINodeLister: c.csiNodeLister,
PVLister: c.pVLister,
PVCLister: c.pVCLister,
StorageClassLister: c.storageClassLister,
VolumeBinder: c.volumeBinder,
HardPodAffinitySymmetricWeight: c.hardPodAffinitySymmetricWeight,
}
c.configProducerArgs = &plugins.ConfigProducerArgs{}
return c
}
// GetHardPodAffinitySymmetricWeight is exposed for testing.
func (c *Configurator) GetHardPodAffinitySymmetricWeight() int32 {
return c.hardPodAffinitySymmetricWeight
@ -404,8 +274,8 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
// Setup cache debugger.
debugger := cachedebugger.New(
c.nodeLister,
c.podLister,
c.informerFactory.Core().V1().Nodes().Lister(),
c.podInformer.Lister(),
c.schedulerCache,
podQueue,
)
@ -427,8 +297,8 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
framework,
extenders,
c.volumeBinder,
c.pVCLister,
c.pdbLister,
c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
GetPodDisruptionBudgetLister(c.informerFactory),
c.alwaysCheckAllPredicates,
c.disablePreemption,
c.percentageOfNodesToScore,
@ -649,3 +519,19 @@ func (b *binder) Bind(binding *v1.Binding) error {
klog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name)
return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding)
}
// GetPodDisruptionBudgetLister returns pdb lister from the given informer factory. Returns nil if PodDisruptionBudget feature is disabled.
func GetPodDisruptionBudgetLister(informerFactory informers.SharedInformerFactory) policylisters.PodDisruptionBudgetLister {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) {
return informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister()
}
return nil
}
// GetCSINodeLister returns CSINode lister from the given informer factory. Returns nil if CSINodeInfo feature is disabled.
func GetCSINodeLister(informerFactory informers.SharedInformerFactory) storagelisters.CSINodeLister {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CSINodeInfo) {
return informerFactory.Storage().V1().CSINodes().Lister()
}
return nil
}

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
@ -40,6 +41,7 @@ import (
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
apitesting "k8s.io/kubernetes/pkg/api/testing"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -52,6 +54,7 @@ import (
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
const (
@ -539,32 +542,31 @@ func newConfigFactoryWithFrameworkRegistry(
client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{},
registry framework.Registry, pluginConfigProducerRegistry *frameworkplugins.ConfigProducerRegistry) *Configurator {
informerFactory := informers.NewSharedInformerFactory(client, 0)
return NewConfigFactory(&ConfigFactoryArgs{
Client: client,
InformerFactory: informerFactory,
NodeInformer: informerFactory.Core().V1().Nodes(),
PodInformer: informerFactory.Core().V1().Pods(),
PvInformer: informerFactory.Core().V1().PersistentVolumes(),
PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(),
ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(),
StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(),
ServiceInformer: informerFactory.Core().V1().Services(),
PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
CSINodeInformer: informerFactory.Storage().V1().CSINodes(),
HardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight,
DisablePreemption: disablePodPreemption,
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
BindTimeoutSeconds: bindTimeoutSeconds,
PodInitialBackoffSeconds: podInitialBackoffDurationSeconds,
PodMaxBackoffSeconds: podMaxBackoffDurationSeconds,
StopCh: stopCh,
Registry: registry,
Plugins: nil,
PluginConfig: []schedulerapi.PluginConfig{},
PluginConfigProducerRegistry: pluginConfigProducerRegistry,
})
snapshot := nodeinfosnapshot.NewEmptySnapshot()
return &Configurator{
client: client,
informerFactory: informerFactory,
podInformer: informerFactory.Core().V1().Pods(),
hardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight,
disablePreemption: disablePodPreemption,
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
bindTimeoutSeconds: bindTimeoutSeconds,
podInitialBackoffSeconds: podInitialBackoffDurationSeconds,
podMaxBackoffSeconds: podMaxBackoffDurationSeconds,
StopEverything: stopCh,
enableNonPreempting: utilfeature.DefaultFeatureGate.Enabled(kubefeatures.NonPreemptingPriority),
registry: registry,
plugins: nil,
pluginConfig: []schedulerapi.PluginConfig{},
pluginConfigProducerRegistry: pluginConfigProducerRegistry,
nodeInfoSnapshot: snapshot,
factoryArgs: PluginFactoryArgs{
SharedLister: snapshot,
InformerFactory: informerFactory,
HardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight,
},
configProducerArgs: &frameworkplugins.ConfigProducerArgs{},
}
}
func newConfigFactory(

View File

@ -33,8 +33,6 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
policyv1beta1informers "k8s.io/client-go/informers/policy/v1beta1"
storageinformers "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
@ -48,6 +46,7 @@ import (
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
@ -267,12 +266,17 @@ func New(client clientset.Interface,
stopCh <-chan struct{},
opts ...Option) (*Scheduler, error) {
stopEverything := stopCh
if stopEverything == nil {
stopEverything = wait.NeverStop
}
options := defaultSchedulerOptions
for _, opt := range opts {
opt(&options)
}
schedulerCache := internalcache.New(30*time.Second, stopCh)
schedulerCache := internalcache.New(30*time.Second, stopEverything)
volumeBinder := volumebinder.NewVolumeBinder(
client,
informerFactory.Core().V1().Nodes(),
@ -290,44 +294,36 @@ func New(client clientset.Interface,
}
registry.Merge(options.frameworkOutOfTreeRegistry)
var pdbInformer policyv1beta1informers.PodDisruptionBudgetInformer
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) {
pdbInformer = informerFactory.Policy().V1beta1().PodDisruptionBudgets()
snapshot := nodeinfosnapshot.NewEmptySnapshot()
configurator := &Configurator{
client: client,
informerFactory: informerFactory,
podInformer: podInformer,
volumeBinder: volumeBinder,
schedulerCache: schedulerCache,
StopEverything: stopEverything,
hardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
disablePreemption: options.disablePreemption,
percentageOfNodesToScore: options.percentageOfNodesToScore,
bindTimeoutSeconds: options.bindTimeoutSeconds,
podInitialBackoffSeconds: options.podInitialBackoffSeconds,
podMaxBackoffSeconds: options.podMaxBackoffSeconds,
enableNonPreempting: utilfeature.DefaultFeatureGate.Enabled(kubefeatures.NonPreemptingPriority),
registry: registry,
plugins: options.frameworkPlugins,
pluginConfig: options.frameworkPluginConfig,
pluginConfigProducerRegistry: options.frameworkConfigProducerRegistry,
nodeInfoSnapshot: snapshot,
factoryArgs: PluginFactoryArgs{
SharedLister: snapshot,
InformerFactory: informerFactory,
VolumeBinder: volumeBinder,
HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
},
configProducerArgs: &frameworkplugins.ConfigProducerArgs{},
}
var csiNodeInformer storageinformers.CSINodeInformer
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CSINodeInfo) {
csiNodeInformer = informerFactory.Storage().V1().CSINodes()
}
// Set up the configurator which can create schedulers from configs.
configurator := NewConfigFactory(&ConfigFactoryArgs{
Client: client,
InformerFactory: informerFactory,
PodInformer: podInformer,
NodeInformer: informerFactory.Core().V1().Nodes(),
PvInformer: informerFactory.Core().V1().PersistentVolumes(),
PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(),
ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(),
StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(),
ServiceInformer: informerFactory.Core().V1().Services(),
PdbInformer: pdbInformer,
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
CSINodeInformer: csiNodeInformer,
VolumeBinder: volumeBinder,
SchedulerCache: schedulerCache,
HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
DisablePreemption: options.disablePreemption,
PercentageOfNodesToScore: options.percentageOfNodesToScore,
BindTimeoutSeconds: options.bindTimeoutSeconds,
PodInitialBackoffSeconds: options.podInitialBackoffSeconds,
PodMaxBackoffSeconds: options.podMaxBackoffSeconds,
Registry: registry,
PluginConfigProducerRegistry: options.frameworkConfigProducerRegistry,
Plugins: options.frameworkPlugins,
PluginConfig: options.frameworkPluginConfig,
})
var sched *Scheduler
source := schedulerAlgorithmSource
switch {
@ -363,7 +359,7 @@ func New(client clientset.Interface,
// Additional tweaks to the config produced by the configurator.
sched.Recorder = recorder
sched.DisablePreemption = options.disablePreemption
sched.StopEverything = stopCh
sched.StopEverything = stopEverything
sched.podConditionUpdater = &podConditionUpdaterImpl{client}
sched.podPreemptor = &podPreemptorImpl{client}
sched.scheduledPodsHasSynced = podInformer.Informer().HasSynced
@ -584,11 +580,11 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
fwk := sched.Framework
podInfo := sched.NextPod()
pod := podInfo.Pod
// pod could be nil when schedulerQueue is closed
if pod == nil {
if podInfo == nil || podInfo.Pod == nil {
return
}
pod := podInfo.Pod
if pod.DeletionTimestamp != nil {
sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)