From 0efbbe855500ab4f3d15a0c2b08f1dd435851274 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 22 Jun 2020 16:06:46 +0200 Subject: [PATCH] CSIStorageCapacity: check for sufficient storage in volume binder This uses the information provided by a CSI driver deployment for checking whether a node has access to enough storage to create the currently unbound volumes, if the CSI driver opts into that checking with CSIDriver.Spec.VolumeCapacity != false. This resolves a TODO from commit 95b530366aee. --- pkg/controller/volume/scheduling/BUILD | 5 + .../volume/scheduling/scheduler_binder.go | 137 ++++++++- .../scheduling/scheduler_binder_test.go | 260 +++++++++++++++++- .../framework/plugins/volumebinding/BUILD | 2 + .../plugins/volumebinding/volume_binding.go | 14 +- .../authorizer/rbac/bootstrappolicy/policy.go | 6 + 6 files changed, 397 insertions(+), 27 deletions(-) diff --git a/pkg/controller/volume/scheduling/BUILD b/pkg/controller/volume/scheduling/BUILD index abb112b39a1..6525c355ecd 100644 --- a/pkg/controller/volume/scheduling/BUILD +++ b/pkg/controller/volume/scheduling/BUILD @@ -17,6 +17,7 @@ go_library( "//pkg/volume/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1:go_default_library", + "//staging/src/k8s.io/api/storage/v1alpha1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", @@ -27,9 +28,11 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library", + "//staging/src/k8s.io/client-go/informers/storage/v1alpha1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library", + "//staging/src/k8s.io/client-go/listers/storage/v1alpha1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/csi-translation-lib:go_default_library", "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library", @@ -51,6 +54,7 @@ go_test( "//pkg/features:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1:go_default_library", + "//staging/src/k8s.io/api/storage/v1alpha1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", @@ -61,6 +65,7 @@ go_test( "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library", + "//staging/src/k8s.io/client-go/informers/storage/v1alpha1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", diff --git a/pkg/controller/volume/scheduling/scheduler_binder.go b/pkg/controller/volume/scheduling/scheduler_binder.go index 949db90e48c..5c3930614fd 100644 --- a/pkg/controller/volume/scheduling/scheduler_binder.go +++ b/pkg/controller/volume/scheduling/scheduler_binder.go @@ -25,6 +25,7 @@ import ( v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" + storagev1alpha1 "k8s.io/api/storage/v1alpha1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -34,9 +35,11 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" coreinformers "k8s.io/client-go/informers/core/v1" storageinformers "k8s.io/client-go/informers/storage/v1" + storageinformersv1alpha1 "k8s.io/client-go/informers/storage/v1alpha1" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" storagelisters "k8s.io/client-go/listers/storage/v1" + storagelistersv1alpha1 "k8s.io/client-go/listers/storage/v1alpha1" csitrans "k8s.io/csi-translation-lib" csiplugins "k8s.io/csi-translation-lib/plugins" "k8s.io/klog/v2" @@ -63,6 +66,8 @@ const ( ErrReasonBindConflict ConflictReason = "node(s) didn't find available persistent volumes to bind" // ErrReasonNodeConflict is used for VolumeNodeAffinityConflict predicate error. ErrReasonNodeConflict ConflictReason = "node(s) had volume node affinity conflict" + // ErrReasonNotEnoughSpace is used when a pod cannot start on a node because not enough storage space is available. + ErrReasonNotEnoughSpace = "node(s) did not have enough free storage" ) // BindingInfo holds a binding between PV and PVC. @@ -131,6 +136,9 @@ type SchedulerVolumeBinder interface { // It returns an error when something went wrong or a list of reasons why the node is // (currently) not usable for the pod. // + // If the CSIStorageCapacity feature is enabled, then it also checks for sufficient storage + // for volumes that still need to be created. + // // This function is called by the scheduler VolumeBinding plugin and can be called in parallel FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error) @@ -174,9 +182,23 @@ type volumeBinder struct { bindTimeout time.Duration translator InTreeToCSITranslator + + capacityCheckEnabled bool + csiDriverLister storagelisters.CSIDriverLister + csiStorageCapacityLister storagelistersv1alpha1.CSIStorageCapacityLister +} + +// CapacityCheck contains additional parameters for NewVolumeBinder that +// are only needed when checking volume sizes against available storage +// capacity is desired. +type CapacityCheck struct { + CSIDriverInformer storageinformers.CSIDriverInformer + CSIStorageCapacityInformer storageinformersv1alpha1.CSIStorageCapacityInformer } // NewVolumeBinder sets up all the caches needed for the scheduler to make volume binding decisions. +// +// capacityCheck determines whether storage capacity is checked (CSIStorageCapacity feature). func NewVolumeBinder( kubeClient clientset.Interface, podInformer coreinformers.PodInformer, @@ -185,8 +207,9 @@ func NewVolumeBinder( pvcInformer coreinformers.PersistentVolumeClaimInformer, pvInformer coreinformers.PersistentVolumeInformer, storageClassInformer storageinformers.StorageClassInformer, + capacityCheck *CapacityCheck, bindTimeout time.Duration) SchedulerVolumeBinder { - return &volumeBinder{ + b := &volumeBinder{ kubeClient: kubeClient, podLister: podInformer.Lister(), classLister: storageClassInformer.Lister(), @@ -197,6 +220,14 @@ func NewVolumeBinder( bindTimeout: bindTimeout, translator: csitrans.New(), } + + if capacityCheck != nil { + b.capacityCheckEnabled = true + b.csiDriverLister = capacityCheck.CSIDriverInformer.Lister() + b.csiStorageCapacityLister = capacityCheck.CSIStorageCapacityInformer.Lister() + } + + return b } // FindPodVolumes finds the matching PVs for PVCs and nodes to provision PVs @@ -214,6 +245,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []* // returns without an error. unboundVolumesSatisfied := true boundVolumesSatisfied := true + sufficientStorage := true defer func() { if err != nil { return @@ -224,6 +256,9 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []* if !unboundVolumesSatisfied { reasons = append(reasons, ErrReasonBindConflict) } + if !sufficientStorage { + reasons = append(reasons, ErrReasonNotEnoughSpace) + } }() start := time.Now() @@ -290,9 +325,10 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []* claimsToProvision = append(claimsToProvision, unboundClaims...) } - // Check for claims to provision + // Check for claims to provision. This is the first time where we potentially + // find out that storage is not sufficient for the node. if len(claimsToProvision) > 0 { - unboundVolumesSatisfied, dynamicProvisions, err = b.checkVolumeProvisions(pod, claimsToProvision, node) + unboundVolumesSatisfied, sufficientStorage, dynamicProvisions, err = b.checkVolumeProvisions(pod, claimsToProvision, node) if err != nil { return } @@ -787,42 +823,51 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.Persi // checkVolumeProvisions checks given unbound claims (the claims have gone through func // findMatchingVolumes, and do not have matching volumes for binding), and return true // if all of the claims are eligible for dynamic provision. -func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, dynamicProvisions []*v1.PersistentVolumeClaim, err error) { +func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied, sufficientStorage bool, dynamicProvisions []*v1.PersistentVolumeClaim, err error) { podName := getPodName(pod) dynamicProvisions = []*v1.PersistentVolumeClaim{} + // We return early with provisionedClaims == nil if a check + // fails or we encounter an error. for _, claim := range claimsToProvision { pvcName := getPVCName(claim) className := v1helper.GetPersistentVolumeClaimClass(claim) if className == "" { - return false, nil, fmt.Errorf("no class for claim %q", pvcName) + return false, false, nil, fmt.Errorf("no class for claim %q", pvcName) } class, err := b.classLister.Get(className) if err != nil { - return false, nil, fmt.Errorf("failed to find storage class %q", className) + return false, false, nil, fmt.Errorf("failed to find storage class %q", className) } provisioner := class.Provisioner if provisioner == "" || provisioner == pvutil.NotSupportedProvisioner { klog.V(4).Infof("storage class %q of claim %q does not support dynamic provisioning", className, pvcName) - return false, nil, nil + return false, true, nil, nil } // Check if the node can satisfy the topology requirement in the class if !v1helper.MatchTopologySelectorTerms(class.AllowedTopologies, labels.Set(node.Labels)) { klog.V(4).Infof("Node %q cannot satisfy provisioning topology requirements of claim %q", node.Name, pvcName) - return false, nil, nil + return false, true, nil, nil } - // TODO: Check if capacity of the node domain in the storage class - // can satisfy resource requirement of given claim + // Check storage capacity. + sufficient, err := b.hasEnoughCapacity(provisioner, claim, class, node) + if err != nil { + return false, false, nil, err + } + if !sufficient { + // hasEnoughCapacity logs an explanation. + return true, false, nil, nil + } dynamicProvisions = append(dynamicProvisions, claim) } klog.V(4).Infof("Provisioning for %d claims of pod %q that has no matching volumes on node %q ...", len(claimsToProvision), podName, node.Name) - return true, dynamicProvisions, nil + return true, true, dynamicProvisions, nil } func (b *volumeBinder) revertAssumedPVs(bindings []*BindingInfo) { @@ -837,6 +882,76 @@ func (b *volumeBinder) revertAssumedPVCs(claims []*v1.PersistentVolumeClaim) { } } +// hasEnoughCapacity checks whether the provisioner has enough capacity left for a new volume of the given size +// that is available from the node. +func (b *volumeBinder) hasEnoughCapacity(provisioner string, claim *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass, node *v1.Node) (bool, error) { + // This is an optional feature. If disabled, we assume that + // there is enough storage. + if !b.capacityCheckEnabled { + return true, nil + } + + quantity, ok := claim.Spec.Resources.Requests[v1.ResourceStorage] + if !ok { + // No capacity to check for. + return true, nil + } + + // Only enabled for CSI drivers which opt into it. + driver, err := b.csiDriverLister.Get(provisioner) + if err != nil { + if apierrors.IsNotFound(err) { + // Either the provisioner is not a CSI driver or the driver does not + // opt into storage capacity scheduling. Either way, skip + // capacity checking. + return true, nil + } + return false, err + } + if driver.Spec.StorageCapacity == nil || !*driver.Spec.StorageCapacity { + return true, nil + } + + // Look for a matching CSIStorageCapacity object(s). + // TODO (for beta): benchmark this and potentially introduce some kind of lookup structure (https://github.com/kubernetes/enhancements/issues/1698#issuecomment-654356718). + capacities, err := b.csiStorageCapacityLister.List(labels.Everything()) + if err != nil { + return false, err + } + + sizeInBytes := quantity.Value() + for _, capacity := range capacities { + if capacity.StorageClassName == storageClass.Name && + capacity.Capacity != nil && + capacity.Capacity.Value() >= sizeInBytes && + b.nodeHasAccess(node, capacity) { + // Enough capacity found. + return true, nil + } + } + + // TODO (?): this doesn't give any information about which pools where considered and why + // they had to be rejected. Log that above? But that might be a lot of log output... + klog.V(4).Infof("Node %q has no accessible CSIStorageCapacity with enough capacity for PVC %s/%s of size %d and storage class %q", + node.Name, claim.Namespace, claim.Name, sizeInBytes, storageClass.Name) + return false, nil +} + +func (b *volumeBinder) nodeHasAccess(node *v1.Node, capacity *storagev1alpha1.CSIStorageCapacity) bool { + if capacity.NodeTopology == nil { + // Unavailable + return false + } + // Only matching by label is supported. + selector, err := metav1.LabelSelectorAsSelector(capacity.NodeTopology) + if err != nil { + // This should never happen because NodeTopology must be valid. + klog.Errorf("unexpected error converting %+v to a label selector: %v", capacity.NodeTopology, err) + return false + } + return selector.Matches(labels.Set(node.Labels)) +} + type byPVCSize []*v1.PersistentVolumeClaim func (a byPVCSize) Len() int { diff --git a/pkg/controller/volume/scheduling/scheduler_binder_test.go b/pkg/controller/volume/scheduling/scheduler_binder_test.go index 786bc0dbe4a..93815e8f498 100644 --- a/pkg/controller/volume/scheduling/scheduler_binder_test.go +++ b/pkg/controller/volume/scheduling/scheduler_binder_test.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" + storagev1alpha1 "k8s.io/api/storage/v1alpha1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -36,6 +37,7 @@ import ( "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" storageinformers "k8s.io/client-go/informers/storage/v1" + storageinformersv1alpha1 "k8s.io/client-go/informers/storage/v1alpha1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" k8stesting "k8s.io/client-go/testing" @@ -48,6 +50,8 @@ import ( ) var ( + provisioner = "test-provisioner" + // PVCs for manual binding // TODO: clean up all of these unboundPVC = makeTestPVC("unbound-pvc", "1G", "", pvcUnbound, "", "1", &waitClass) @@ -128,9 +132,13 @@ type testEnv struct { internalCSINodeInformer storageinformers.CSINodeInformer internalPVCache *assumeCache internalPVCCache *assumeCache + + // For CSIStorageCapacity feature testing: + internalCSIDriverInformer storageinformers.CSIDriverInformer + internalCSIStorageCapacityInformer storageinformersv1alpha1.CSIStorageCapacityInformer } -func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv { +func newTestBinder(t *testing.T, stopCh <-chan struct{}, csiStorageCapacity ...bool) *testEnv { client := &fake.Clientset{} reactor := pvtesting.NewVolumeReactor(client, nil, nil, nil) // TODO refactor all tests to use real watch mechanism, see #72327 @@ -150,6 +158,15 @@ func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv { csiNodeInformer := informerFactory.Storage().V1().CSINodes() pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() classInformer := informerFactory.Storage().V1().StorageClasses() + csiDriverInformer := informerFactory.Storage().V1().CSIDrivers() + csiStorageCapacityInformer := informerFactory.Storage().V1alpha1().CSIStorageCapacities() + var capacityCheck *CapacityCheck + if len(csiStorageCapacity) > 0 && csiStorageCapacity[0] { + capacityCheck = &CapacityCheck{ + CSIDriverInformer: csiDriverInformer, + CSIStorageCapacityInformer: csiStorageCapacityInformer, + } + } binder := NewVolumeBinder( client, podInformer, @@ -158,6 +175,7 @@ func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv { pvcInformer, informerFactory.Core().V1().PersistentVolumes(), classInformer, + capacityCheck, 10*time.Second) // Wait for informers cache sync @@ -177,7 +195,7 @@ func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv { Name: waitClassWithProvisioner, }, VolumeBindingMode: &waitMode, - Provisioner: "test-provisioner", + Provisioner: provisioner, AllowedTopologies: []v1.TopologySelectorTerm{ { MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{ @@ -207,7 +225,7 @@ func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv { Name: topoMismatchClass, }, VolumeBindingMode: &waitMode, - Provisioner: "test-provisioner", + Provisioner: provisioner, AllowedTopologies: []v1.TopologySelectorTerm{ { MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{ @@ -254,6 +272,9 @@ func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv { internalCSINodeInformer: csiNodeInformer, internalPVCache: internalPVCache, internalPVCCache: internalPVCCache, + + internalCSIDriverInformer: csiDriverInformer, + internalCSIStorageCapacityInformer: csiStorageCapacityInformer, } } @@ -271,6 +292,18 @@ func (env *testEnv) initCSINodes(cachedCSINodes []*storagev1.CSINode) { } } +func (env *testEnv) addCSIDriver(csiDriver *storagev1.CSIDriver) { + csiDriverInformer := env.internalCSIDriverInformer.Informer() + csiDriverInformer.GetIndexer().Add(csiDriver) +} + +func (env *testEnv) addCSIStorageCapacities(capacities []*storagev1alpha1.CSIStorageCapacity) { + csiStorageCapacityInformer := env.internalCSIStorageCapacityInformer.Informer() + for _, capacity := range capacities { + csiStorageCapacityInformer.GetIndexer().Add(capacity) + } +} + func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs []*v1.PersistentVolumeClaim) { internalPVCCache := env.internalPVCCache for _, pvc := range cachedPVCs { @@ -678,6 +711,35 @@ func makeCSINode(name, migratedPlugin string) *storagev1.CSINode { } } +func makeCSIDriver(name string, storageCapacity bool) *storagev1.CSIDriver { + return &storagev1.CSIDriver{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: storagev1.CSIDriverSpec{ + StorageCapacity: &storageCapacity, + }, + } +} + +func makeCapacity(name, storageClassName string, node *v1.Node, capacityStr string) *storagev1alpha1.CSIStorageCapacity { + c := &storagev1alpha1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + StorageClassName: storageClassName, + NodeTopology: &metav1.LabelSelector{}, + } + if node != nil { + c.NodeTopology.MatchLabels = map[string]string{nodeLabelKey: node.Labels[nodeLabelKey]} + } + if capacityStr != "" { + capacityQuantity := resource.MustParse(capacityStr) + c.Capacity = &capacityQuantity + } + return c +} + func makePod(pvcs []*v1.PersistentVolumeClaim) *v1.Pod { pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -745,6 +807,8 @@ func reasonNames(reasons ConflictReasons) string { varNames = append(varNames, "ErrReasonBindConflict") case ErrReasonNodeConflict: varNames = append(varNames, "ErrReasonNodeConflict") + case ErrReasonNotEnoughSpace: + varNames = append(varNames, "ErrReasonNotEnoughSpace") default: varNames = append(varNames, string(reason)) } @@ -897,13 +961,16 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) { }, } - run := func(t *testing.T, scenario scenarioType) { + run := func(t *testing.T, scenario scenarioType, csiStorageCapacity bool, csiDriver *storagev1.CSIDriver) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Setup - testEnv := newTestBinder(t, ctx.Done()) + testEnv := newTestBinder(t, ctx.Done(), csiStorageCapacity) testEnv.initVolumes(scenario.pvs, scenario.pvs) + if csiDriver != nil { + testEnv.addCSIDriver(csiDriver) + } // a. Init pvc cache if scenario.cachePVCs == nil { @@ -930,8 +997,20 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) { testEnv.validatePodCache(t, testNode.Name, scenario.pod, podVolumes, scenario.expectedBindings, nil) } - for name, scenario := range scenarios { - t.Run(name, func(t *testing.T) { run(t, scenario) }) + for prefix, csiStorageCapacity := range map[string]bool{"with": true, "without": false} { + t.Run(fmt.Sprintf("%s CSIStorageCapacity", prefix), func(t *testing.T) { + for description, csiDriver := range map[string]*storagev1.CSIDriver{ + "no CSIDriver": nil, + "CSIDriver with capacity tracking": makeCSIDriver(provisioner, true), + "CSIDriver without capacity tracking": makeCSIDriver(provisioner, false), + } { + t.Run(description, func(t *testing.T) { + for name, scenario := range scenarios { + t.Run(name, func(t *testing.T) { run(t, scenario, csiStorageCapacity, csiDriver) }) + } + }) + } + }) } } @@ -950,29 +1029,34 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) { expectedProvisions []*v1.PersistentVolumeClaim // Expected return values - reasons ConflictReasons - shouldFail bool + reasons ConflictReasons + shouldFail bool + needsCapacity bool } scenarios := map[string]scenarioType{ "one-provisioned": { podPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC}, + needsCapacity: true, }, "two-unbound-pvcs,one-matched,one-provisioned": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC}, pvs: []*v1.PersistentVolume{pvNode1a}, expectedBindings: []*BindingInfo{makeBinding(unboundPVC, pvNode1a)}, expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC}, + needsCapacity: true, }, "one-bound,one-provisioned": { podPVCs: []*v1.PersistentVolumeClaim{boundPVC, provisionedPVC}, pvs: []*v1.PersistentVolume{pvBound}, expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC}, + needsCapacity: true, }, "one-binding,one-selected-node": { podPVCs: []*v1.PersistentVolumeClaim{boundPVC, selectedNodePVC}, pvs: []*v1.PersistentVolume{pvBound}, expectedProvisions: []*v1.PersistentVolumeClaim{selectedNodePVC}, + needsCapacity: true, }, "immediate-unbound-pvc": { podPVCs: []*v1.PersistentVolumeClaim{immediateUnboundPVC}, @@ -982,6 +1066,7 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) { podPVCs: []*v1.PersistentVolumeClaim{immediateBoundPVC, provisionedPVC}, pvs: []*v1.PersistentVolume{pvBoundImmediate}, expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC}, + needsCapacity: true, }, "invalid-provisioner": { podPVCs: []*v1.PersistentVolumeClaim{noProvisionerPVC}, @@ -1002,13 +1087,16 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) { }, } - run := func(t *testing.T, scenario scenarioType) { + run := func(t *testing.T, scenario scenarioType, csiStorageCapacity bool, csiDriver *storagev1.CSIDriver) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Setup - testEnv := newTestBinder(t, ctx.Done()) + testEnv := newTestBinder(t, ctx.Done(), csiStorageCapacity) testEnv.initVolumes(scenario.pvs, scenario.pvs) + if csiDriver != nil { + testEnv.addCSIDriver(csiDriver) + } // a. Init pvc cache if scenario.cachePVCs == nil { @@ -1031,12 +1119,32 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) { if scenario.shouldFail && err == nil { t.Error("returned success but expected error") } - checkReasons(t, reasons, scenario.reasons) - testEnv.validatePodCache(t, testNode.Name, scenario.pod, podVolumes, scenario.expectedBindings, scenario.expectedProvisions) + expectedReasons := scenario.reasons + expectedProvisions := scenario.expectedProvisions + if scenario.needsCapacity && csiStorageCapacity && + csiDriver != nil && csiDriver.Spec.StorageCapacity != nil && *csiDriver.Spec.StorageCapacity { + // Without CSIStorageCapacity objects, provisioning is blocked. + expectedReasons = append(expectedReasons, ErrReasonNotEnoughSpace) + expectedProvisions = nil + } + checkReasons(t, reasons, expectedReasons) + testEnv.validatePodCache(t, testNode.Name, scenario.pod, podVolumes, scenario.expectedBindings, expectedProvisions) } - for name, scenario := range scenarios { - t.Run(name, func(t *testing.T) { run(t, scenario) }) + for prefix, csiStorageCapacity := range map[string]bool{"with": true, "without": false} { + t.Run(fmt.Sprintf("%s CSIStorageCapacity", prefix), func(t *testing.T) { + for description, csiDriver := range map[string]*storagev1.CSIDriver{ + "no CSIDriver": nil, + "CSIDriver with capacity tracking": makeCSIDriver(provisioner, true), + "CSIDriver without capacity tracking": makeCSIDriver(provisioner, false), + } { + t.Run(description, func(t *testing.T) { + for name, scenario := range scenarios { + t.Run(name, func(t *testing.T) { run(t, scenario, csiStorageCapacity, csiDriver) }) + } + }) + } + }) } } @@ -2008,3 +2116,125 @@ func TestFindAssumeVolumes(t *testing.T) { testEnv.validatePodCache(t, testNode.Name, pod, podVolumes, expectedBindings, nil) } } + +// TestCapacity covers different scenarios involving CSIStorageCapacity objects. +// Scenarios without those are covered by TestFindPodVolumesWithProvisioning. +func TestCapacity(t *testing.T) { + type scenarioType struct { + // Inputs + pvcs []*v1.PersistentVolumeClaim + capacities []*storagev1alpha1.CSIStorageCapacity + + // Expected return values + reasons ConflictReasons + shouldFail bool + } + scenarios := map[string]scenarioType{ + "network-attached": { + pvcs: []*v1.PersistentVolumeClaim{provisionedPVC}, + capacities: []*storagev1alpha1.CSIStorageCapacity{ + makeCapacity("net", waitClassWithProvisioner, nil, "1Gi"), + }, + }, + "local-storage": { + pvcs: []*v1.PersistentVolumeClaim{provisionedPVC}, + capacities: []*storagev1alpha1.CSIStorageCapacity{ + makeCapacity("net", waitClassWithProvisioner, node1, "1Gi"), + }, + }, + "multiple": { + pvcs: []*v1.PersistentVolumeClaim{provisionedPVC}, + capacities: []*storagev1alpha1.CSIStorageCapacity{ + makeCapacity("net", waitClassWithProvisioner, nil, "1Gi"), + makeCapacity("net", waitClassWithProvisioner, node2, "1Gi"), + makeCapacity("net", waitClassWithProvisioner, node1, "1Gi"), + }, + }, + "no-storage": { + pvcs: []*v1.PersistentVolumeClaim{provisionedPVC}, + reasons: ConflictReasons{ErrReasonNotEnoughSpace}, + }, + "wrong-node": { + pvcs: []*v1.PersistentVolumeClaim{provisionedPVC}, + capacities: []*storagev1alpha1.CSIStorageCapacity{ + makeCapacity("net", waitClassWithProvisioner, node2, "1Gi"), + }, + reasons: ConflictReasons{ErrReasonNotEnoughSpace}, + }, + "wrong-storage-class": { + pvcs: []*v1.PersistentVolumeClaim{provisionedPVC}, + capacities: []*storagev1alpha1.CSIStorageCapacity{ + makeCapacity("net", waitClass, node1, "1Gi"), + }, + reasons: ConflictReasons{ErrReasonNotEnoughSpace}, + }, + "insufficient-storage": { + pvcs: []*v1.PersistentVolumeClaim{provisionedPVC}, + capacities: []*storagev1alpha1.CSIStorageCapacity{ + makeCapacity("net", waitClassWithProvisioner, node1, "1Mi"), + }, + reasons: ConflictReasons{ErrReasonNotEnoughSpace}, + }, + "zero-storage": { + pvcs: []*v1.PersistentVolumeClaim{provisionedPVC}, + capacities: []*storagev1alpha1.CSIStorageCapacity{ + makeCapacity("net", waitClassWithProvisioner, node1, "0Mi"), + }, + reasons: ConflictReasons{ErrReasonNotEnoughSpace}, + }, + "nil-storage": { + pvcs: []*v1.PersistentVolumeClaim{provisionedPVC}, + capacities: []*storagev1alpha1.CSIStorageCapacity{ + makeCapacity("net", waitClassWithProvisioner, node1, ""), + }, + reasons: ConflictReasons{ErrReasonNotEnoughSpace}, + }, + } + + testNode := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Labels: map[string]string{ + nodeLabelKey: "node1", + }, + }, + } + + run := func(t *testing.T, scenario scenarioType) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Setup + withCSIStorageCapacity := true + testEnv := newTestBinder(t, ctx.Done(), withCSIStorageCapacity) + testEnv.addCSIDriver(makeCSIDriver(provisioner, withCSIStorageCapacity)) + testEnv.addCSIStorageCapacities(scenario.capacities) + + // a. Init pvc cache + testEnv.initClaims(scenario.pvcs, scenario.pvcs) + + // b. Generate pod with given claims + pod := makePod(scenario.pvcs) + + // Execute + podVolumes, reasons, err := findPodVolumes(testEnv.binder, pod, testNode) + + // Validate + if !scenario.shouldFail && err != nil { + t.Errorf("returned error: %v", err) + } + if scenario.shouldFail && err == nil { + t.Error("returned success but expected error") + } + checkReasons(t, reasons, scenario.reasons) + provisions := scenario.pvcs + if len(reasons) > 0 { + provisions = nil + } + testEnv.validatePodCache(t, pod.Spec.NodeName, pod, podVolumes, nil, provisions) + } + + for name, scenario := range scenarios { + t.Run(name, func(t *testing.T) { run(t, scenario) }) + } +} diff --git a/pkg/scheduler/framework/plugins/volumebinding/BUILD b/pkg/scheduler/framework/plugins/volumebinding/BUILD index 9b8a012cdd1..dd70db1ea9d 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/BUILD +++ b/pkg/scheduler/framework/plugins/volumebinding/BUILD @@ -7,10 +7,12 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/controller/volume/scheduling:go_default_library", + "//pkg/features:go_default_library", "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", ], ) diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go index 18614a43c89..0fee9534f40 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go @@ -24,8 +24,10 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller/volume/scheduling" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/apis/config" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" ) @@ -136,6 +138,9 @@ func getStateData(cs *framework.CycleState) (*stateData, error) { // For PVCs that are unbound, it tries to find available PVs that can satisfy the PVC requirements // and that the PV node affinity is satisfied by the given node. // +// If storage capacity tracking is enabled, then enough space has to be available +// for the node and volumes that still need to be created. +// // The predicate returns true if all bound PVCs have compatible PVs with the node, and if all unbound // PVCs can be matched with an available and node-compatible PV. func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { @@ -254,7 +259,14 @@ func New(plArgs runtime.Object, fh framework.FrameworkHandle) (framework.Plugin, pvInformer := fh.SharedInformerFactory().Core().V1().PersistentVolumes() storageClassInformer := fh.SharedInformerFactory().Storage().V1().StorageClasses() csiNodeInformer := fh.SharedInformerFactory().Storage().V1().CSINodes() - binder := scheduling.NewVolumeBinder(fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second) + var capacityCheck *scheduling.CapacityCheck + if utilfeature.DefaultFeatureGate.Enabled(features.CSIStorageCapacity) { + capacityCheck = &scheduling.CapacityCheck{ + CSIDriverInformer: fh.SharedInformerFactory().Storage().V1().CSIDrivers(), + CSIStorageCapacityInformer: fh.SharedInformerFactory().Storage().V1alpha1().CSIStorageCapacities(), + } + } + binder := scheduling.NewVolumeBinder(fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, capacityCheck, time.Duration(args.BindTimeoutSeconds)*time.Second) return &VolumeBinding{ Binder: binder, }, nil diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 8dd4ef80a9b..c3624119d2e 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -541,6 +541,12 @@ func ClusterRoles() []rbacv1.ClusterRole { // Needed for volume limits rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csinodes").RuleOrDie(), } + if utilfeature.DefaultFeatureGate.Enabled(features.CSIStorageCapacity) { + kubeSchedulerRules = append(kubeSchedulerRules, + rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csidrivers").RuleOrDie(), + rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csistoragecapacities").RuleOrDie(), + ) + } roles = append(roles, rbacv1.ClusterRole{ // a role to use for the kube-scheduler ObjectMeta: metav1.ObjectMeta{Name: "system:kube-scheduler"},