diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go index 20f08223b5a..c33d363bb76 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go @@ -23,6 +23,7 @@ import ( v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/rand" corelisters "k8s.io/client-go/listers/core/v1" @@ -60,6 +61,7 @@ type CSILimits struct { pvLister corelisters.PersistentVolumeLister pvcLister corelisters.PersistentVolumeClaimLister scLister storagelisters.StorageClassLister + vaLister storagelisters.VolumeAttachmentLister randomVolumeIDPrefix string @@ -183,6 +185,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v logger.V(5).Info("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err) } + // Count CSI volumes from the new pod newVolumes := make(map[string]string) if err := pl.filterAttachableVolumes(logger, pod, csiNode, true /* new pod */, newVolumes); err != nil { if apierrors.IsNotFound(err) { @@ -203,6 +206,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v return nil } + // Count CSI volumes from existing pods attachedVolumes := make(map[string]string) for _, existingPod := range nodeInfo.Pods { if err := pl.filterAttachableVolumes(logger, existingPod.Pod, csiNode, false /* existing pod */, attachedVolumes); err != nil { @@ -217,6 +221,19 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v attachedVolumeCount[driverName]++ } + // Count CSI volumes from VolumeAttachments + volumeAttachments, err := pl.getNodeVolumeAttachmentInfo(logger, node.Name) + if err != nil { + return framework.AsStatus(err) + } + + for volumeUniqueName, driverName := range volumeAttachments { + // Avoid double-counting volumes already used by existing pods + if _, exists := attachedVolumes[volumeUniqueName]; !exists { + attachedVolumeCount[driverName]++ + } + } + // Count the new volumes count per driver newVolumeCount := map[string]int{} for _, driverName := range newVolumes { @@ -303,7 +320,7 @@ func (pl *CSILimits) filterAttachableVolumes( continue } - volumeUniqueName := fmt.Sprintf("%s/%s", driverName, volumeHandle) + volumeUniqueName := getVolumeUniqueName(driverName, volumeHandle) result[volumeUniqueName] = driverName } return nil @@ -344,7 +361,7 @@ func (pl *CSILimits) checkAttachableInlineVolume(logger klog.Logger, vol *v1.Vol if translatedPV.Spec.PersistentVolumeSource.CSI == nil { return nil } - volumeUniqueName := fmt.Sprintf("%s/%s", driverName, translatedPV.Spec.PersistentVolumeSource.CSI.VolumeHandle) + volumeUniqueName := getVolumeUniqueName(driverName, translatedPV.Spec.PersistentVolumeSource.CSI.VolumeHandle) result[volumeUniqueName] = driverName return nil } @@ -453,6 +470,7 @@ func NewCSI(_ context.Context, _ runtime.Object, handle framework.Handle, fts fe pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister() csiNodesLister := informerFactory.Storage().V1().CSINodes().Lister() scLister := informerFactory.Storage().V1().StorageClasses().Lister() + vaLister := informerFactory.Storage().V1().VolumeAttachments().Lister() csiTranslator := csitrans.New() return &CSILimits{ @@ -460,6 +478,7 @@ func NewCSI(_ context.Context, _ runtime.Object, handle framework.Handle, fts fe pvLister: pvLister, pvcLister: pvcLister, scLister: scLister, + vaLister: vaLister, randomVolumeIDPrefix: rand.String(32), translator: csiTranslator, }, nil @@ -480,3 +499,40 @@ func getVolumeLimits(csiNode *storagev1.CSINode) map[string]int64 { } return nodeVolumeLimits } + +// getNodeVolumeAttachmentInfo returns a map of volumeID to driver name for the given node. +func (pl *CSILimits) getNodeVolumeAttachmentInfo(logger klog.Logger, nodeName string) (map[string]string, error) { + volumeAttachments := make(map[string]string) + vas, err := pl.vaLister.List(labels.Everything()) + if err != nil { + return nil, err + } + for _, va := range vas { + if va.Spec.NodeName == nodeName { + if va.Spec.Attacher == "" { + logger.V(5).Info("VolumeAttachment has no attacher", "VolumeAttachment", klog.KObj(va)) + continue + } + if va.Spec.Source.PersistentVolumeName == nil { + logger.V(5).Info("VolumeAttachment has no PV name", "VolumeAttachment", klog.KObj(va)) + continue + } + pv, err := pl.pvLister.Get(*va.Spec.Source.PersistentVolumeName) + if err != nil { + logger.V(5).Info("Unable to get PV for VolumeAttachment", "VolumeAttachment", klog.KObj(va), "err", err) + continue + } + if pv.Spec.CSI == nil { + logger.V(5).Info("PV is not a CSI volume", "PV", klog.KObj(pv)) + continue + } + volumeID := getVolumeUniqueName(va.Spec.Attacher, pv.Spec.CSI.VolumeHandle) + volumeAttachments[volumeID] = va.Spec.Attacher + } + } + return volumeAttachments, nil +} + +func getVolumeUniqueName(driverName, volumeHandle string) string { + return fmt.Sprintf("%s/%s", driverName, volumeHandle) +} diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go index 70d16185719..65f8d140edb 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go @@ -265,6 +265,7 @@ func TestCSILimits(t *testing.T) { extraClaims []v1.PersistentVolumeClaim filterName string maxVols int32 + vaCount int driverNames []string test string migrationEnabled bool @@ -273,6 +274,27 @@ func TestCSILimits(t *testing.T) { wantStatus *framework.Status wantPreFilterStatus *framework.Status }{ + { + newPod: csiEBSOneVolPod, + existingPods: []*v1.Pod{}, + filterName: "csi", + maxVols: 2, + driverNames: []string{ebsCSIDriverName}, + vaCount: 2, + test: "should count VolumeAttachments towards volume limit when no pods exist", + limitSource: "csinode", + wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded), + }, + { + newPod: csiEBSOneVolPod, + existingPods: []*v1.Pod{}, + filterName: "csi", + maxVols: 2, + driverNames: []string{ebsCSIDriverName}, + vaCount: 1, + test: "should schedule pod when VolumeAttachments count does not exceed limit", + limitSource: "csinode", + }, { newPod: csiEBSOneVolPod, existingPods: []*v1.Pod{runningPod, csiEBSTwoVolPod}, @@ -609,6 +631,7 @@ func TestCSILimits(t *testing.T) { pvLister: getFakeCSIPVLister(test.filterName, test.driverNames...), pvcLister: append(getFakeCSIPVCLister(test.filterName, scName, test.driverNames...), test.extraClaims...), scLister: getFakeCSIStorageClassLister(scName, test.driverNames[0]), + vaLister: getFakeVolumeAttachmentLister(test.vaCount, test.driverNames...), randomVolumeIDPrefix: rand.String(32), translator: csiTranslator, } @@ -769,6 +792,28 @@ func TestCSILimitsAddedPVCQHint(t *testing.T) { } } +func getFakeVolumeAttachmentLister(count int, driverNames ...string) tf.VolumeAttachmentLister { + vaLister := tf.VolumeAttachmentLister{} + for _, driver := range driverNames { + for j := 0; j < count; j++ { + pvName := fmt.Sprintf("csi-%s-%d", driver, j) + va := storagev1.VolumeAttachment{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("va-%s-%d", driver, j), + }, + Spec: storagev1.VolumeAttachmentSpec{ + NodeName: "node-for-max-pd-test-1", + Attacher: driver, + Source: storagev1.VolumeAttachmentSource{ + PersistentVolumeName: &pvName, + }, + }, + } + vaLister = append(vaLister, va) + } + } + return vaLister +} func getFakeCSIPVLister(volumeName string, driverNames ...string) tf.PersistentVolumeLister { pvLister := tf.PersistentVolumeLister{} for _, driver := range driverNames { diff --git a/pkg/scheduler/testing/framework/fake_listers.go b/pkg/scheduler/testing/framework/fake_listers.go index fc2dd39788f..36cb5e21f5c 100644 --- a/pkg/scheduler/testing/framework/fake_listers.go +++ b/pkg/scheduler/testing/framework/fake_listers.go @@ -313,3 +313,27 @@ func (classes StorageClassLister) Get(name string) (*storagev1.StorageClass, err func (classes StorageClassLister) List(selector labels.Selector) ([]*storagev1.StorageClass, error) { return nil, fmt.Errorf("not implemented") } + +// VolumeAttachmentLister declares a []storagev1.VolumeAttachment type for testing. +type VolumeAttachmentLister []storagev1.VolumeAttachment + +var _ storagelisters.VolumeAttachmentLister = VolumeAttachmentLister{} + +// List lists all VolumeAttachments in the indexer. +func (val VolumeAttachmentLister) List(selector labels.Selector) (ret []*storagev1.VolumeAttachment, err error) { + var list []*storagev1.VolumeAttachment + for i := range val { + list = append(list, &val[i]) + } + return list, nil +} + +// Get returns a fake VolumeAttachment object from the fake VolumeAttachments by name. +func (val VolumeAttachmentLister) Get(name string) (*storagev1.VolumeAttachment, error) { + for _, va := range val { + if va.Name == name { + return &va, nil + } + } + return nil, errors.NewNotFound(storagev1.Resource("volumeattachments"), name) +} diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 4c203379e1f..dbfd92b0b7b 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -572,6 +572,7 @@ func ClusterRoles() []rbacv1.ClusterRole { rbacv1helpers.NewRule("create").Groups(authorizationGroup).Resources("subjectaccessreviews").RuleOrDie(), // Needed for volume limits rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csinodes").RuleOrDie(), + rbacv1helpers.NewRule("get", "list", "watch").Groups(storageGroup).Resources("volumeattachments").RuleOrDie(), // Needed for namespaceSelector feature in pod affinity rbacv1helpers.NewRule(Read...).Groups(legacyGroup).Resources("namespaces").RuleOrDie(), rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csidrivers").RuleOrDie(), diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml index 5c5963c2479..cfb27005f85 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml @@ -851,6 +851,14 @@ items: - get - list - watch + - apiGroups: + - storage.k8s.io + resources: + - volumeattachments + verbs: + - get + - list + - watch - apiGroups: - "" resources: