CSIStorageCapacity: check for sufficient storage in volume binder

This uses the information provided by a CSI driver deployment for
checking whether a node has access to enough storage to create the
currently unbound volumes, if the CSI driver opts into that checking
with CSIDriver.Spec.VolumeCapacity != false.

This resolves a TODO from commit 95b530366a.
This commit is contained in:
Patrick Ohly 2020-06-22 16:06:46 +02:00
parent 9a66e8e1b5
commit 0efbbe8555
6 changed files with 397 additions and 27 deletions

View File

@ -17,6 +17,7 @@ go_library(
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1alpha1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -27,9 +28,11 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1alpha1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1alpha1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/csi-translation-lib:go_default_library",
"//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
@ -51,6 +54,7 @@ go_test(
"//pkg/features:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1alpha1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
@ -61,6 +65,7 @@ go_test(
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1alpha1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",

View File

@ -25,6 +25,7 @@ import (
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
storagev1alpha1 "k8s.io/api/storage/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@ -34,9 +35,11 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1"
storageinformersv1alpha1 "k8s.io/client-go/informers/storage/v1alpha1"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1"
storagelistersv1alpha1 "k8s.io/client-go/listers/storage/v1alpha1"
csitrans "k8s.io/csi-translation-lib"
csiplugins "k8s.io/csi-translation-lib/plugins"
"k8s.io/klog/v2"
@ -63,6 +66,8 @@ const (
ErrReasonBindConflict ConflictReason = "node(s) didn't find available persistent volumes to bind"
// ErrReasonNodeConflict is used for VolumeNodeAffinityConflict predicate error.
ErrReasonNodeConflict ConflictReason = "node(s) had volume node affinity conflict"
// ErrReasonNotEnoughSpace is used when a pod cannot start on a node because not enough storage space is available.
ErrReasonNotEnoughSpace = "node(s) did not have enough free storage"
)
// BindingInfo holds a binding between PV and PVC.
@ -131,6 +136,9 @@ type SchedulerVolumeBinder interface {
// It returns an error when something went wrong or a list of reasons why the node is
// (currently) not usable for the pod.
//
// If the CSIStorageCapacity feature is enabled, then it also checks for sufficient storage
// for volumes that still need to be created.
//
// This function is called by the scheduler VolumeBinding plugin and can be called in parallel
FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error)
@ -174,9 +182,23 @@ type volumeBinder struct {
bindTimeout time.Duration
translator InTreeToCSITranslator
capacityCheckEnabled bool
csiDriverLister storagelisters.CSIDriverLister
csiStorageCapacityLister storagelistersv1alpha1.CSIStorageCapacityLister
}
// CapacityCheck contains additional parameters for NewVolumeBinder that
// are only needed when checking volume sizes against available storage
// capacity is desired.
type CapacityCheck struct {
CSIDriverInformer storageinformers.CSIDriverInformer
CSIStorageCapacityInformer storageinformersv1alpha1.CSIStorageCapacityInformer
}
// NewVolumeBinder sets up all the caches needed for the scheduler to make volume binding decisions.
//
// capacityCheck determines whether storage capacity is checked (CSIStorageCapacity feature).
func NewVolumeBinder(
kubeClient clientset.Interface,
podInformer coreinformers.PodInformer,
@ -185,8 +207,9 @@ func NewVolumeBinder(
pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer,
storageClassInformer storageinformers.StorageClassInformer,
capacityCheck *CapacityCheck,
bindTimeout time.Duration) SchedulerVolumeBinder {
return &volumeBinder{
b := &volumeBinder{
kubeClient: kubeClient,
podLister: podInformer.Lister(),
classLister: storageClassInformer.Lister(),
@ -197,6 +220,14 @@ func NewVolumeBinder(
bindTimeout: bindTimeout,
translator: csitrans.New(),
}
if capacityCheck != nil {
b.capacityCheckEnabled = true
b.csiDriverLister = capacityCheck.CSIDriverInformer.Lister()
b.csiStorageCapacityLister = capacityCheck.CSIStorageCapacityInformer.Lister()
}
return b
}
// FindPodVolumes finds the matching PVs for PVCs and nodes to provision PVs
@ -214,6 +245,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*
// returns without an error.
unboundVolumesSatisfied := true
boundVolumesSatisfied := true
sufficientStorage := true
defer func() {
if err != nil {
return
@ -224,6 +256,9 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*
if !unboundVolumesSatisfied {
reasons = append(reasons, ErrReasonBindConflict)
}
if !sufficientStorage {
reasons = append(reasons, ErrReasonNotEnoughSpace)
}
}()
start := time.Now()
@ -290,9 +325,10 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*
claimsToProvision = append(claimsToProvision, unboundClaims...)
}
// Check for claims to provision
// Check for claims to provision. This is the first time where we potentially
// find out that storage is not sufficient for the node.
if len(claimsToProvision) > 0 {
unboundVolumesSatisfied, dynamicProvisions, err = b.checkVolumeProvisions(pod, claimsToProvision, node)
unboundVolumesSatisfied, sufficientStorage, dynamicProvisions, err = b.checkVolumeProvisions(pod, claimsToProvision, node)
if err != nil {
return
}
@ -787,42 +823,51 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.Persi
// checkVolumeProvisions checks given unbound claims (the claims have gone through func
// findMatchingVolumes, and do not have matching volumes for binding), and return true
// if all of the claims are eligible for dynamic provision.
func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, dynamicProvisions []*v1.PersistentVolumeClaim, err error) {
func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied, sufficientStorage bool, dynamicProvisions []*v1.PersistentVolumeClaim, err error) {
podName := getPodName(pod)
dynamicProvisions = []*v1.PersistentVolumeClaim{}
// We return early with provisionedClaims == nil if a check
// fails or we encounter an error.
for _, claim := range claimsToProvision {
pvcName := getPVCName(claim)
className := v1helper.GetPersistentVolumeClaimClass(claim)
if className == "" {
return false, nil, fmt.Errorf("no class for claim %q", pvcName)
return false, false, nil, fmt.Errorf("no class for claim %q", pvcName)
}
class, err := b.classLister.Get(className)
if err != nil {
return false, nil, fmt.Errorf("failed to find storage class %q", className)
return false, false, nil, fmt.Errorf("failed to find storage class %q", className)
}
provisioner := class.Provisioner
if provisioner == "" || provisioner == pvutil.NotSupportedProvisioner {
klog.V(4).Infof("storage class %q of claim %q does not support dynamic provisioning", className, pvcName)
return false, nil, nil
return false, true, nil, nil
}
// Check if the node can satisfy the topology requirement in the class
if !v1helper.MatchTopologySelectorTerms(class.AllowedTopologies, labels.Set(node.Labels)) {
klog.V(4).Infof("Node %q cannot satisfy provisioning topology requirements of claim %q", node.Name, pvcName)
return false, nil, nil
return false, true, nil, nil
}
// TODO: Check if capacity of the node domain in the storage class
// can satisfy resource requirement of given claim
// Check storage capacity.
sufficient, err := b.hasEnoughCapacity(provisioner, claim, class, node)
if err != nil {
return false, false, nil, err
}
if !sufficient {
// hasEnoughCapacity logs an explanation.
return true, false, nil, nil
}
dynamicProvisions = append(dynamicProvisions, claim)
}
klog.V(4).Infof("Provisioning for %d claims of pod %q that has no matching volumes on node %q ...", len(claimsToProvision), podName, node.Name)
return true, dynamicProvisions, nil
return true, true, dynamicProvisions, nil
}
func (b *volumeBinder) revertAssumedPVs(bindings []*BindingInfo) {
@ -837,6 +882,76 @@ func (b *volumeBinder) revertAssumedPVCs(claims []*v1.PersistentVolumeClaim) {
}
}
// hasEnoughCapacity checks whether the provisioner has enough capacity left for a new volume of the given size
// that is available from the node.
func (b *volumeBinder) hasEnoughCapacity(provisioner string, claim *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass, node *v1.Node) (bool, error) {
// This is an optional feature. If disabled, we assume that
// there is enough storage.
if !b.capacityCheckEnabled {
return true, nil
}
quantity, ok := claim.Spec.Resources.Requests[v1.ResourceStorage]
if !ok {
// No capacity to check for.
return true, nil
}
// Only enabled for CSI drivers which opt into it.
driver, err := b.csiDriverLister.Get(provisioner)
if err != nil {
if apierrors.IsNotFound(err) {
// Either the provisioner is not a CSI driver or the driver does not
// opt into storage capacity scheduling. Either way, skip
// capacity checking.
return true, nil
}
return false, err
}
if driver.Spec.StorageCapacity == nil || !*driver.Spec.StorageCapacity {
return true, nil
}
// Look for a matching CSIStorageCapacity object(s).
// TODO (for beta): benchmark this and potentially introduce some kind of lookup structure (https://github.com/kubernetes/enhancements/issues/1698#issuecomment-654356718).
capacities, err := b.csiStorageCapacityLister.List(labels.Everything())
if err != nil {
return false, err
}
sizeInBytes := quantity.Value()
for _, capacity := range capacities {
if capacity.StorageClassName == storageClass.Name &&
capacity.Capacity != nil &&
capacity.Capacity.Value() >= sizeInBytes &&
b.nodeHasAccess(node, capacity) {
// Enough capacity found.
return true, nil
}
}
// TODO (?): this doesn't give any information about which pools where considered and why
// they had to be rejected. Log that above? But that might be a lot of log output...
klog.V(4).Infof("Node %q has no accessible CSIStorageCapacity with enough capacity for PVC %s/%s of size %d and storage class %q",
node.Name, claim.Namespace, claim.Name, sizeInBytes, storageClass.Name)
return false, nil
}
func (b *volumeBinder) nodeHasAccess(node *v1.Node, capacity *storagev1alpha1.CSIStorageCapacity) bool {
if capacity.NodeTopology == nil {
// Unavailable
return false
}
// Only matching by label is supported.
selector, err := metav1.LabelSelectorAsSelector(capacity.NodeTopology)
if err != nil {
// This should never happen because NodeTopology must be valid.
klog.Errorf("unexpected error converting %+v to a label selector: %v", capacity.NodeTopology, err)
return false
}
return selector.Matches(labels.Set(node.Labels))
}
type byPVCSize []*v1.PersistentVolumeClaim
func (a byPVCSize) Len() int {

View File

@ -26,6 +26,7 @@ import (
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
storagev1alpha1 "k8s.io/api/storage/v1alpha1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -36,6 +37,7 @@ import (
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1"
storageinformersv1alpha1 "k8s.io/client-go/informers/storage/v1alpha1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
@ -48,6 +50,8 @@ import (
)
var (
provisioner = "test-provisioner"
// PVCs for manual binding
// TODO: clean up all of these
unboundPVC = makeTestPVC("unbound-pvc", "1G", "", pvcUnbound, "", "1", &waitClass)
@ -128,9 +132,13 @@ type testEnv struct {
internalCSINodeInformer storageinformers.CSINodeInformer
internalPVCache *assumeCache
internalPVCCache *assumeCache
// For CSIStorageCapacity feature testing:
internalCSIDriverInformer storageinformers.CSIDriverInformer
internalCSIStorageCapacityInformer storageinformersv1alpha1.CSIStorageCapacityInformer
}
func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv {
func newTestBinder(t *testing.T, stopCh <-chan struct{}, csiStorageCapacity ...bool) *testEnv {
client := &fake.Clientset{}
reactor := pvtesting.NewVolumeReactor(client, nil, nil, nil)
// TODO refactor all tests to use real watch mechanism, see #72327
@ -150,6 +158,15 @@ func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv {
csiNodeInformer := informerFactory.Storage().V1().CSINodes()
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
classInformer := informerFactory.Storage().V1().StorageClasses()
csiDriverInformer := informerFactory.Storage().V1().CSIDrivers()
csiStorageCapacityInformer := informerFactory.Storage().V1alpha1().CSIStorageCapacities()
var capacityCheck *CapacityCheck
if len(csiStorageCapacity) > 0 && csiStorageCapacity[0] {
capacityCheck = &CapacityCheck{
CSIDriverInformer: csiDriverInformer,
CSIStorageCapacityInformer: csiStorageCapacityInformer,
}
}
binder := NewVolumeBinder(
client,
podInformer,
@ -158,6 +175,7 @@ func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv {
pvcInformer,
informerFactory.Core().V1().PersistentVolumes(),
classInformer,
capacityCheck,
10*time.Second)
// Wait for informers cache sync
@ -177,7 +195,7 @@ func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv {
Name: waitClassWithProvisioner,
},
VolumeBindingMode: &waitMode,
Provisioner: "test-provisioner",
Provisioner: provisioner,
AllowedTopologies: []v1.TopologySelectorTerm{
{
MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
@ -207,7 +225,7 @@ func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv {
Name: topoMismatchClass,
},
VolumeBindingMode: &waitMode,
Provisioner: "test-provisioner",
Provisioner: provisioner,
AllowedTopologies: []v1.TopologySelectorTerm{
{
MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
@ -254,6 +272,9 @@ func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv {
internalCSINodeInformer: csiNodeInformer,
internalPVCache: internalPVCache,
internalPVCCache: internalPVCCache,
internalCSIDriverInformer: csiDriverInformer,
internalCSIStorageCapacityInformer: csiStorageCapacityInformer,
}
}
@ -271,6 +292,18 @@ func (env *testEnv) initCSINodes(cachedCSINodes []*storagev1.CSINode) {
}
}
func (env *testEnv) addCSIDriver(csiDriver *storagev1.CSIDriver) {
csiDriverInformer := env.internalCSIDriverInformer.Informer()
csiDriverInformer.GetIndexer().Add(csiDriver)
}
func (env *testEnv) addCSIStorageCapacities(capacities []*storagev1alpha1.CSIStorageCapacity) {
csiStorageCapacityInformer := env.internalCSIStorageCapacityInformer.Informer()
for _, capacity := range capacities {
csiStorageCapacityInformer.GetIndexer().Add(capacity)
}
}
func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs []*v1.PersistentVolumeClaim) {
internalPVCCache := env.internalPVCCache
for _, pvc := range cachedPVCs {
@ -678,6 +711,35 @@ func makeCSINode(name, migratedPlugin string) *storagev1.CSINode {
}
}
func makeCSIDriver(name string, storageCapacity bool) *storagev1.CSIDriver {
return &storagev1.CSIDriver{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: storagev1.CSIDriverSpec{
StorageCapacity: &storageCapacity,
},
}
}
func makeCapacity(name, storageClassName string, node *v1.Node, capacityStr string) *storagev1alpha1.CSIStorageCapacity {
c := &storagev1alpha1.CSIStorageCapacity{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
StorageClassName: storageClassName,
NodeTopology: &metav1.LabelSelector{},
}
if node != nil {
c.NodeTopology.MatchLabels = map[string]string{nodeLabelKey: node.Labels[nodeLabelKey]}
}
if capacityStr != "" {
capacityQuantity := resource.MustParse(capacityStr)
c.Capacity = &capacityQuantity
}
return c
}
func makePod(pvcs []*v1.PersistentVolumeClaim) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -745,6 +807,8 @@ func reasonNames(reasons ConflictReasons) string {
varNames = append(varNames, "ErrReasonBindConflict")
case ErrReasonNodeConflict:
varNames = append(varNames, "ErrReasonNodeConflict")
case ErrReasonNotEnoughSpace:
varNames = append(varNames, "ErrReasonNotEnoughSpace")
default:
varNames = append(varNames, string(reason))
}
@ -897,13 +961,16 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) {
},
}
run := func(t *testing.T, scenario scenarioType) {
run := func(t *testing.T, scenario scenarioType, csiStorageCapacity bool, csiDriver *storagev1.CSIDriver) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Setup
testEnv := newTestBinder(t, ctx.Done())
testEnv := newTestBinder(t, ctx.Done(), csiStorageCapacity)
testEnv.initVolumes(scenario.pvs, scenario.pvs)
if csiDriver != nil {
testEnv.addCSIDriver(csiDriver)
}
// a. Init pvc cache
if scenario.cachePVCs == nil {
@ -930,8 +997,20 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) {
testEnv.validatePodCache(t, testNode.Name, scenario.pod, podVolumes, scenario.expectedBindings, nil)
}
for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) { run(t, scenario) })
for prefix, csiStorageCapacity := range map[string]bool{"with": true, "without": false} {
t.Run(fmt.Sprintf("%s CSIStorageCapacity", prefix), func(t *testing.T) {
for description, csiDriver := range map[string]*storagev1.CSIDriver{
"no CSIDriver": nil,
"CSIDriver with capacity tracking": makeCSIDriver(provisioner, true),
"CSIDriver without capacity tracking": makeCSIDriver(provisioner, false),
} {
t.Run(description, func(t *testing.T) {
for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) { run(t, scenario, csiStorageCapacity, csiDriver) })
}
})
}
})
}
}
@ -950,29 +1029,34 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) {
expectedProvisions []*v1.PersistentVolumeClaim
// Expected return values
reasons ConflictReasons
shouldFail bool
reasons ConflictReasons
shouldFail bool
needsCapacity bool
}
scenarios := map[string]scenarioType{
"one-provisioned": {
podPVCs: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC},
needsCapacity: true,
},
"two-unbound-pvcs,one-matched,one-provisioned": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC},
pvs: []*v1.PersistentVolume{pvNode1a},
expectedBindings: []*BindingInfo{makeBinding(unboundPVC, pvNode1a)},
expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC},
needsCapacity: true,
},
"one-bound,one-provisioned": {
podPVCs: []*v1.PersistentVolumeClaim{boundPVC, provisionedPVC},
pvs: []*v1.PersistentVolume{pvBound},
expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC},
needsCapacity: true,
},
"one-binding,one-selected-node": {
podPVCs: []*v1.PersistentVolumeClaim{boundPVC, selectedNodePVC},
pvs: []*v1.PersistentVolume{pvBound},
expectedProvisions: []*v1.PersistentVolumeClaim{selectedNodePVC},
needsCapacity: true,
},
"immediate-unbound-pvc": {
podPVCs: []*v1.PersistentVolumeClaim{immediateUnboundPVC},
@ -982,6 +1066,7 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) {
podPVCs: []*v1.PersistentVolumeClaim{immediateBoundPVC, provisionedPVC},
pvs: []*v1.PersistentVolume{pvBoundImmediate},
expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC},
needsCapacity: true,
},
"invalid-provisioner": {
podPVCs: []*v1.PersistentVolumeClaim{noProvisionerPVC},
@ -1002,13 +1087,16 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) {
},
}
run := func(t *testing.T, scenario scenarioType) {
run := func(t *testing.T, scenario scenarioType, csiStorageCapacity bool, csiDriver *storagev1.CSIDriver) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Setup
testEnv := newTestBinder(t, ctx.Done())
testEnv := newTestBinder(t, ctx.Done(), csiStorageCapacity)
testEnv.initVolumes(scenario.pvs, scenario.pvs)
if csiDriver != nil {
testEnv.addCSIDriver(csiDriver)
}
// a. Init pvc cache
if scenario.cachePVCs == nil {
@ -1031,12 +1119,32 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) {
if scenario.shouldFail && err == nil {
t.Error("returned success but expected error")
}
checkReasons(t, reasons, scenario.reasons)
testEnv.validatePodCache(t, testNode.Name, scenario.pod, podVolumes, scenario.expectedBindings, scenario.expectedProvisions)
expectedReasons := scenario.reasons
expectedProvisions := scenario.expectedProvisions
if scenario.needsCapacity && csiStorageCapacity &&
csiDriver != nil && csiDriver.Spec.StorageCapacity != nil && *csiDriver.Spec.StorageCapacity {
// Without CSIStorageCapacity objects, provisioning is blocked.
expectedReasons = append(expectedReasons, ErrReasonNotEnoughSpace)
expectedProvisions = nil
}
checkReasons(t, reasons, expectedReasons)
testEnv.validatePodCache(t, testNode.Name, scenario.pod, podVolumes, scenario.expectedBindings, expectedProvisions)
}
for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) { run(t, scenario) })
for prefix, csiStorageCapacity := range map[string]bool{"with": true, "without": false} {
t.Run(fmt.Sprintf("%s CSIStorageCapacity", prefix), func(t *testing.T) {
for description, csiDriver := range map[string]*storagev1.CSIDriver{
"no CSIDriver": nil,
"CSIDriver with capacity tracking": makeCSIDriver(provisioner, true),
"CSIDriver without capacity tracking": makeCSIDriver(provisioner, false),
} {
t.Run(description, func(t *testing.T) {
for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) { run(t, scenario, csiStorageCapacity, csiDriver) })
}
})
}
})
}
}
@ -2008,3 +2116,125 @@ func TestFindAssumeVolumes(t *testing.T) {
testEnv.validatePodCache(t, testNode.Name, pod, podVolumes, expectedBindings, nil)
}
}
// TestCapacity covers different scenarios involving CSIStorageCapacity objects.
// Scenarios without those are covered by TestFindPodVolumesWithProvisioning.
func TestCapacity(t *testing.T) {
type scenarioType struct {
// Inputs
pvcs []*v1.PersistentVolumeClaim
capacities []*storagev1alpha1.CSIStorageCapacity
// Expected return values
reasons ConflictReasons
shouldFail bool
}
scenarios := map[string]scenarioType{
"network-attached": {
pvcs: []*v1.PersistentVolumeClaim{provisionedPVC},
capacities: []*storagev1alpha1.CSIStorageCapacity{
makeCapacity("net", waitClassWithProvisioner, nil, "1Gi"),
},
},
"local-storage": {
pvcs: []*v1.PersistentVolumeClaim{provisionedPVC},
capacities: []*storagev1alpha1.CSIStorageCapacity{
makeCapacity("net", waitClassWithProvisioner, node1, "1Gi"),
},
},
"multiple": {
pvcs: []*v1.PersistentVolumeClaim{provisionedPVC},
capacities: []*storagev1alpha1.CSIStorageCapacity{
makeCapacity("net", waitClassWithProvisioner, nil, "1Gi"),
makeCapacity("net", waitClassWithProvisioner, node2, "1Gi"),
makeCapacity("net", waitClassWithProvisioner, node1, "1Gi"),
},
},
"no-storage": {
pvcs: []*v1.PersistentVolumeClaim{provisionedPVC},
reasons: ConflictReasons{ErrReasonNotEnoughSpace},
},
"wrong-node": {
pvcs: []*v1.PersistentVolumeClaim{provisionedPVC},
capacities: []*storagev1alpha1.CSIStorageCapacity{
makeCapacity("net", waitClassWithProvisioner, node2, "1Gi"),
},
reasons: ConflictReasons{ErrReasonNotEnoughSpace},
},
"wrong-storage-class": {
pvcs: []*v1.PersistentVolumeClaim{provisionedPVC},
capacities: []*storagev1alpha1.CSIStorageCapacity{
makeCapacity("net", waitClass, node1, "1Gi"),
},
reasons: ConflictReasons{ErrReasonNotEnoughSpace},
},
"insufficient-storage": {
pvcs: []*v1.PersistentVolumeClaim{provisionedPVC},
capacities: []*storagev1alpha1.CSIStorageCapacity{
makeCapacity("net", waitClassWithProvisioner, node1, "1Mi"),
},
reasons: ConflictReasons{ErrReasonNotEnoughSpace},
},
"zero-storage": {
pvcs: []*v1.PersistentVolumeClaim{provisionedPVC},
capacities: []*storagev1alpha1.CSIStorageCapacity{
makeCapacity("net", waitClassWithProvisioner, node1, "0Mi"),
},
reasons: ConflictReasons{ErrReasonNotEnoughSpace},
},
"nil-storage": {
pvcs: []*v1.PersistentVolumeClaim{provisionedPVC},
capacities: []*storagev1alpha1.CSIStorageCapacity{
makeCapacity("net", waitClassWithProvisioner, node1, ""),
},
reasons: ConflictReasons{ErrReasonNotEnoughSpace},
},
}
testNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Labels: map[string]string{
nodeLabelKey: "node1",
},
},
}
run := func(t *testing.T, scenario scenarioType) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Setup
withCSIStorageCapacity := true
testEnv := newTestBinder(t, ctx.Done(), withCSIStorageCapacity)
testEnv.addCSIDriver(makeCSIDriver(provisioner, withCSIStorageCapacity))
testEnv.addCSIStorageCapacities(scenario.capacities)
// a. Init pvc cache
testEnv.initClaims(scenario.pvcs, scenario.pvcs)
// b. Generate pod with given claims
pod := makePod(scenario.pvcs)
// Execute
podVolumes, reasons, err := findPodVolumes(testEnv.binder, pod, testNode)
// Validate
if !scenario.shouldFail && err != nil {
t.Errorf("returned error: %v", err)
}
if scenario.shouldFail && err == nil {
t.Error("returned success but expected error")
}
checkReasons(t, reasons, scenario.reasons)
provisions := scenario.pvcs
if len(reasons) > 0 {
provisions = nil
}
testEnv.validatePodCache(t, pod.Spec.NodeName, pod, podVolumes, nil, provisions)
}
for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) { run(t, scenario) })
}
}

View File

@ -7,10 +7,12 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/controller/volume/scheduling:go_default_library",
"//pkg/features:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
)

View File

@ -24,8 +24,10 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
@ -136,6 +138,9 @@ func getStateData(cs *framework.CycleState) (*stateData, error) {
// For PVCs that are unbound, it tries to find available PVs that can satisfy the PVC requirements
// and that the PV node affinity is satisfied by the given node.
//
// If storage capacity tracking is enabled, then enough space has to be available
// for the node and volumes that still need to be created.
//
// The predicate returns true if all bound PVCs have compatible PVs with the node, and if all unbound
// PVCs can be matched with an available and node-compatible PV.
func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
@ -254,7 +259,14 @@ func New(plArgs runtime.Object, fh framework.FrameworkHandle) (framework.Plugin,
pvInformer := fh.SharedInformerFactory().Core().V1().PersistentVolumes()
storageClassInformer := fh.SharedInformerFactory().Storage().V1().StorageClasses()
csiNodeInformer := fh.SharedInformerFactory().Storage().V1().CSINodes()
binder := scheduling.NewVolumeBinder(fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
var capacityCheck *scheduling.CapacityCheck
if utilfeature.DefaultFeatureGate.Enabled(features.CSIStorageCapacity) {
capacityCheck = &scheduling.CapacityCheck{
CSIDriverInformer: fh.SharedInformerFactory().Storage().V1().CSIDrivers(),
CSIStorageCapacityInformer: fh.SharedInformerFactory().Storage().V1alpha1().CSIStorageCapacities(),
}
}
binder := scheduling.NewVolumeBinder(fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, capacityCheck, time.Duration(args.BindTimeoutSeconds)*time.Second)
return &VolumeBinding{
Binder: binder,
}, nil

View File

@ -541,6 +541,12 @@ func ClusterRoles() []rbacv1.ClusterRole {
// Needed for volume limits
rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csinodes").RuleOrDie(),
}
if utilfeature.DefaultFeatureGate.Enabled(features.CSIStorageCapacity) {
kubeSchedulerRules = append(kubeSchedulerRules,
rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csidrivers").RuleOrDie(),
rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csistoragecapacities").RuleOrDie(),
)
}
roles = append(roles, rbacv1.ClusterRole{
// a role to use for the kube-scheduler
ObjectMeta: metav1.ObjectMeta{Name: "system:kube-scheduler"},