mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Feature-gate CSINode and CSIDriver informer starts
This commit is contained in:
parent
3f402534f3
commit
0e2f2dde4d
@ -124,6 +124,7 @@ go_library(
|
|||||||
"//staging/src/k8s.io/client-go/discovery/cached/memory:go_default_library",
|
"//staging/src/k8s.io/client-go/discovery/cached/memory:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
|
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/metadata:go_default_library",
|
"//staging/src/k8s.io/client-go/metadata:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/metadata/metadatainformer:go_default_library",
|
"//staging/src/k8s.io/client-go/metadata/metadatainformer:go_default_library",
|
||||||
|
@ -33,6 +33,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
|
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
|
||||||
|
storagev1beta1informer "k8s.io/client-go/informers/storage/v1beta1"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/metadata"
|
"k8s.io/client-go/metadata"
|
||||||
restclient "k8s.io/client-go/rest"
|
restclient "k8s.io/client-go/rest"
|
||||||
@ -278,6 +279,17 @@ func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, err
|
|||||||
return nil, true, fmt.Errorf("duration time must be greater than one second as set via command line option reconcile-sync-loop-period")
|
return nil, true, fmt.Errorf("duration time must be greater than one second as set via command line option reconcile-sync-loop-period")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
csiNodeInformer storagev1beta1informer.CSINodeInformer
|
||||||
|
csiDriverInformer storagev1beta1informer.CSIDriverInformer
|
||||||
|
)
|
||||||
|
if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
|
||||||
|
csiNodeInformer = ctx.InformerFactory.Storage().V1beta1().CSINodes()
|
||||||
|
}
|
||||||
|
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
|
||||||
|
csiDriverInformer = ctx.InformerFactory.Storage().V1beta1().CSIDrivers()
|
||||||
|
}
|
||||||
|
|
||||||
attachDetachController, attachDetachControllerErr :=
|
attachDetachController, attachDetachControllerErr :=
|
||||||
attachdetach.NewAttachDetachController(
|
attachdetach.NewAttachDetachController(
|
||||||
ctx.ClientBuilder.ClientOrDie("attachdetach-controller"),
|
ctx.ClientBuilder.ClientOrDie("attachdetach-controller"),
|
||||||
@ -285,8 +297,8 @@ func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, err
|
|||||||
ctx.InformerFactory.Core().V1().Nodes(),
|
ctx.InformerFactory.Core().V1().Nodes(),
|
||||||
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
|
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
|
||||||
ctx.InformerFactory.Core().V1().PersistentVolumes(),
|
ctx.InformerFactory.Core().V1().PersistentVolumes(),
|
||||||
ctx.InformerFactory.Storage().V1beta1().CSINodes(),
|
csiNodeInformer,
|
||||||
ctx.InformerFactory.Storage().V1beta1().CSIDrivers(),
|
csiDriverInformer,
|
||||||
ctx.Cloud,
|
ctx.Cloud,
|
||||||
ProbeAttachableVolumePlugins(),
|
ProbeAttachableVolumePlugins(),
|
||||||
GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),
|
GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),
|
||||||
|
@ -446,12 +446,18 @@ func (c *MaxPDVolumeCountChecker) predicate(pod *v1.Pod, meta PredicateMetadata,
|
|||||||
return false, nil, fmt.Errorf("node not found")
|
return false, nil, fmt.Errorf("node not found")
|
||||||
}
|
}
|
||||||
|
|
||||||
csiNode, err := c.csiNodeLister.Get(node.Name)
|
var (
|
||||||
|
csiNode *v1beta1storage.CSINode
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
if c.csiNodeLister != nil {
|
||||||
|
csiNode, err = c.csiNodeLister.Get(node.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// we don't fail here because the CSINode object is only necessary
|
// we don't fail here because the CSINode object is only necessary
|
||||||
// for determining whether the migration is enabled or not
|
// for determining whether the migration is enabled or not
|
||||||
klog.V(5).Infof("Could not get a CSINode object for the node: %v", err)
|
klog.V(5).Infof("Could not get a CSINode object for the node: %v", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// if a plugin has been migrated to a CSI driver, defer to the CSI predicate
|
// if a plugin has been migrated to a CSI driver, defer to the CSI predicate
|
||||||
if c.filter.IsMigrated(csiNode) {
|
if c.filter.IsMigrated(csiNode) {
|
||||||
|
@ -6,18 +6,23 @@ go_library(
|
|||||||
"azure.go",
|
"azure.go",
|
||||||
"cinder.go",
|
"cinder.go",
|
||||||
"csi.go",
|
"csi.go",
|
||||||
|
"csinode_helper.go",
|
||||||
"ebs.go",
|
"ebs.go",
|
||||||
"gce.go",
|
"gce.go",
|
||||||
],
|
],
|
||||||
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits",
|
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits",
|
||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
deps = [
|
deps = [
|
||||||
|
"//pkg/features:go_default_library",
|
||||||
"//pkg/scheduler/algorithm/predicates:go_default_library",
|
"//pkg/scheduler/algorithm/predicates:go_default_library",
|
||||||
"//pkg/scheduler/framework/plugins/migration:go_default_library",
|
"//pkg/scheduler/framework/plugins/migration:go_default_library",
|
||||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||||
"//pkg/scheduler/nodeinfo:go_default_library",
|
"//pkg/scheduler/nodeinfo:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -52,12 +52,11 @@ func (pl *AzureDiskLimits) Filter(ctx context.Context, _ *framework.CycleState,
|
|||||||
// NewAzureDisk returns function that initializes a new plugin and returns it.
|
// NewAzureDisk returns function that initializes a new plugin and returns it.
|
||||||
func NewAzureDisk(_ *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
|
func NewAzureDisk(_ *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
|
||||||
informerFactory := handle.SharedInformerFactory()
|
informerFactory := handle.SharedInformerFactory()
|
||||||
csiNodeLister := informerFactory.Storage().V1beta1().CSINodes().Lister()
|
|
||||||
pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
|
pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
|
||||||
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
|
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
|
||||||
scLister := informerFactory.Storage().V1().StorageClasses().Lister()
|
scLister := informerFactory.Storage().V1().StorageClasses().Lister()
|
||||||
|
|
||||||
return &AzureDiskLimits{
|
return &AzureDiskLimits{
|
||||||
predicate: predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilterType, csiNodeLister, scLister, pvLister, pvcLister),
|
predicate: predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilterType, getCSINodeListerIfEnabled(informerFactory), scLister, pvLister, pvcLister),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -52,11 +52,10 @@ func (pl *CinderLimits) Filter(ctx context.Context, _ *framework.CycleState, pod
|
|||||||
// NewCinder returns function that initializes a new plugin and returns it.
|
// NewCinder returns function that initializes a new plugin and returns it.
|
||||||
func NewCinder(_ *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
|
func NewCinder(_ *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
|
||||||
informerFactory := handle.SharedInformerFactory()
|
informerFactory := handle.SharedInformerFactory()
|
||||||
csiNodeLister := informerFactory.Storage().V1beta1().CSINodes().Lister()
|
|
||||||
pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
|
pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
|
||||||
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
|
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
|
||||||
scLister := informerFactory.Storage().V1().StorageClasses().Lister()
|
scLister := informerFactory.Storage().V1().StorageClasses().Lister()
|
||||||
return &CinderLimits{
|
return &CinderLimits{
|
||||||
predicate: predicates.NewMaxPDVolumeCountPredicate(predicates.CinderVolumeFilterType, csiNodeLister, scLister, pvLister, pvcLister),
|
predicate: predicates.NewMaxPDVolumeCountPredicate(predicates.CinderVolumeFilterType, getCSINodeListerIfEnabled(informerFactory), scLister, pvLister, pvcLister),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -52,12 +52,11 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
|
|||||||
// NewCSI initializes a new plugin and returns it.
|
// NewCSI initializes a new plugin and returns it.
|
||||||
func NewCSI(_ *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
|
func NewCSI(_ *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
|
||||||
informerFactory := handle.SharedInformerFactory()
|
informerFactory := handle.SharedInformerFactory()
|
||||||
csiNodeLister := informerFactory.Storage().V1beta1().CSINodes().Lister()
|
|
||||||
pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
|
pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
|
||||||
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
|
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
|
||||||
scLister := informerFactory.Storage().V1().StorageClasses().Lister()
|
scLister := informerFactory.Storage().V1().StorageClasses().Lister()
|
||||||
|
|
||||||
return &CSILimits{
|
return &CSILimits{
|
||||||
predicate: predicates.NewCSIMaxVolumeLimitPredicate(csiNodeLister, pvLister, pvcLister, scLister),
|
predicate: predicates.NewCSIMaxVolumeLimitPredicate(getCSINodeListerIfEnabled(informerFactory), pvLister, pvcLister, scLister),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,32 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package nodevolumelimits
|
||||||
|
|
||||||
|
import (
|
||||||
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
|
"k8s.io/client-go/informers"
|
||||||
|
v1beta1 "k8s.io/client-go/listers/storage/v1beta1"
|
||||||
|
kubefeatures "k8s.io/kubernetes/pkg/features"
|
||||||
|
)
|
||||||
|
|
||||||
|
// getCSINodeListerIfEnabled returns the CSINode lister or nil if the feature is disabled
|
||||||
|
func getCSINodeListerIfEnabled(factory informers.SharedInformerFactory) v1beta1.CSINodeLister {
|
||||||
|
if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CSINodeInfo) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return factory.Storage().V1beta1().CSINodes().Lister()
|
||||||
|
}
|
@ -52,12 +52,11 @@ func (pl *EBSLimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
|
|||||||
// NewEBS returns function that initializes a new plugin and returns it.
|
// NewEBS returns function that initializes a new plugin and returns it.
|
||||||
func NewEBS(_ *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
|
func NewEBS(_ *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
|
||||||
informerFactory := handle.SharedInformerFactory()
|
informerFactory := handle.SharedInformerFactory()
|
||||||
csiNodeLister := informerFactory.Storage().V1beta1().CSINodes().Lister()
|
|
||||||
pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
|
pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
|
||||||
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
|
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
|
||||||
scLister := informerFactory.Storage().V1().StorageClasses().Lister()
|
scLister := informerFactory.Storage().V1().StorageClasses().Lister()
|
||||||
|
|
||||||
return &EBSLimits{
|
return &EBSLimits{
|
||||||
predicate: predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilterType, csiNodeLister, scLister, pvLister, pvcLister),
|
predicate: predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilterType, getCSINodeListerIfEnabled(informerFactory), scLister, pvLister, pvcLister),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -52,12 +52,11 @@ func (pl *GCEPDLimits) Filter(ctx context.Context, _ *framework.CycleState, pod
|
|||||||
// NewGCEPD returns function that initializes a new plugin and returns it.
|
// NewGCEPD returns function that initializes a new plugin and returns it.
|
||||||
func NewGCEPD(_ *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
|
func NewGCEPD(_ *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
|
||||||
informerFactory := handle.SharedInformerFactory()
|
informerFactory := handle.SharedInformerFactory()
|
||||||
csiNodeLister := informerFactory.Storage().V1beta1().CSINodes().Lister()
|
|
||||||
pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
|
pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
|
||||||
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
|
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
|
||||||
scLister := informerFactory.Storage().V1().StorageClasses().Lister()
|
scLister := informerFactory.Storage().V1().StorageClasses().Lister()
|
||||||
|
|
||||||
return &GCEPDLimits{
|
return &GCEPDLimits{
|
||||||
predicate: predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilterType, csiNodeLister, scLister, pvLister, pvcLister),
|
predicate: predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilterType, getCSINodeListerIfEnabled(informerFactory), scLister, pvLister, pvcLister),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -33,6 +33,7 @@ import (
|
|||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||||
policyv1beta1informers "k8s.io/client-go/informers/policy/v1beta1"
|
policyv1beta1informers "k8s.io/client-go/informers/policy/v1beta1"
|
||||||
|
storagev1beta1informers "k8s.io/client-go/informers/storage/v1beta1"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/tools/events"
|
"k8s.io/client-go/tools/events"
|
||||||
@ -287,6 +288,11 @@ func New(client clientset.Interface,
|
|||||||
pdbInformer = informerFactory.Policy().V1beta1().PodDisruptionBudgets()
|
pdbInformer = informerFactory.Policy().V1beta1().PodDisruptionBudgets()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var csiNodeInformer storagev1beta1informers.CSINodeInformer
|
||||||
|
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CSINodeInfo) {
|
||||||
|
csiNodeInformer = informerFactory.Storage().V1beta1().CSINodes()
|
||||||
|
}
|
||||||
|
|
||||||
// Set up the configurator which can create schedulers from configs.
|
// Set up the configurator which can create schedulers from configs.
|
||||||
configurator := NewConfigFactory(&ConfigFactoryArgs{
|
configurator := NewConfigFactory(&ConfigFactoryArgs{
|
||||||
Client: client,
|
Client: client,
|
||||||
@ -301,7 +307,7 @@ func New(client clientset.Interface,
|
|||||||
ServiceInformer: informerFactory.Core().V1().Services(),
|
ServiceInformer: informerFactory.Core().V1().Services(),
|
||||||
PdbInformer: pdbInformer,
|
PdbInformer: pdbInformer,
|
||||||
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
|
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
|
||||||
CSINodeInformer: informerFactory.Storage().V1beta1().CSINodes(),
|
CSINodeInformer: csiNodeInformer,
|
||||||
VolumeBinder: volumeBinder,
|
VolumeBinder: volumeBinder,
|
||||||
SchedulerCache: schedulerCache,
|
SchedulerCache: schedulerCache,
|
||||||
HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
|
HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
|
||||||
|
Loading…
Reference in New Issue
Block a user