Merge pull request #127757 from torredil/scheduler-bugfix-5123

scheduler: Improve CSILimits plugin accuracy by using VolumeAttachments
This commit is contained in:
Kubernetes Prow Robot 2024-10-23 18:12:52 +01:00 committed by GitHub
commit 352056f09d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 136 additions and 2 deletions

View File

@ -23,6 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/rand"
corelisters "k8s.io/client-go/listers/core/v1"
@ -60,6 +61,7 @@ type CSILimits struct {
pvLister corelisters.PersistentVolumeLister
pvcLister corelisters.PersistentVolumeClaimLister
scLister storagelisters.StorageClassLister
vaLister storagelisters.VolumeAttachmentLister
randomVolumeIDPrefix string
@ -183,6 +185,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
logger.V(5).Info("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err)
}
// Count CSI volumes from the new pod
newVolumes := make(map[string]string)
if err := pl.filterAttachableVolumes(logger, pod, csiNode, true /* new pod */, newVolumes); err != nil {
if apierrors.IsNotFound(err) {
@ -203,6 +206,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
return nil
}
// Count CSI volumes from existing pods
attachedVolumes := make(map[string]string)
for _, existingPod := range nodeInfo.Pods {
if err := pl.filterAttachableVolumes(logger, existingPod.Pod, csiNode, false /* existing pod */, attachedVolumes); err != nil {
@ -217,6 +221,19 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
attachedVolumeCount[driverName]++
}
// Count CSI volumes from VolumeAttachments
volumeAttachments, err := pl.getNodeVolumeAttachmentInfo(logger, node.Name)
if err != nil {
return framework.AsStatus(err)
}
for volumeUniqueName, driverName := range volumeAttachments {
// Avoid double-counting volumes already used by existing pods
if _, exists := attachedVolumes[volumeUniqueName]; !exists {
attachedVolumeCount[driverName]++
}
}
// Count the new volumes count per driver
newVolumeCount := map[string]int{}
for _, driverName := range newVolumes {
@ -303,7 +320,7 @@ func (pl *CSILimits) filterAttachableVolumes(
continue
}
volumeUniqueName := fmt.Sprintf("%s/%s", driverName, volumeHandle)
volumeUniqueName := getVolumeUniqueName(driverName, volumeHandle)
result[volumeUniqueName] = driverName
}
return nil
@ -344,7 +361,7 @@ func (pl *CSILimits) checkAttachableInlineVolume(logger klog.Logger, vol *v1.Vol
if translatedPV.Spec.PersistentVolumeSource.CSI == nil {
return nil
}
volumeUniqueName := fmt.Sprintf("%s/%s", driverName, translatedPV.Spec.PersistentVolumeSource.CSI.VolumeHandle)
volumeUniqueName := getVolumeUniqueName(driverName, translatedPV.Spec.PersistentVolumeSource.CSI.VolumeHandle)
result[volumeUniqueName] = driverName
return nil
}
@ -453,6 +470,7 @@ func NewCSI(_ context.Context, _ runtime.Object, handle framework.Handle, fts fe
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
csiNodesLister := informerFactory.Storage().V1().CSINodes().Lister()
scLister := informerFactory.Storage().V1().StorageClasses().Lister()
vaLister := informerFactory.Storage().V1().VolumeAttachments().Lister()
csiTranslator := csitrans.New()
return &CSILimits{
@ -460,6 +478,7 @@ func NewCSI(_ context.Context, _ runtime.Object, handle framework.Handle, fts fe
pvLister: pvLister,
pvcLister: pvcLister,
scLister: scLister,
vaLister: vaLister,
randomVolumeIDPrefix: rand.String(32),
translator: csiTranslator,
}, nil
@ -480,3 +499,40 @@ func getVolumeLimits(csiNode *storagev1.CSINode) map[string]int64 {
}
return nodeVolumeLimits
}
// getNodeVolumeAttachmentInfo returns a map of volumeID to driver name for the given node.
func (pl *CSILimits) getNodeVolumeAttachmentInfo(logger klog.Logger, nodeName string) (map[string]string, error) {
volumeAttachments := make(map[string]string)
vas, err := pl.vaLister.List(labels.Everything())
if err != nil {
return nil, err
}
for _, va := range vas {
if va.Spec.NodeName == nodeName {
if va.Spec.Attacher == "" {
logger.V(5).Info("VolumeAttachment has no attacher", "VolumeAttachment", klog.KObj(va))
continue
}
if va.Spec.Source.PersistentVolumeName == nil {
logger.V(5).Info("VolumeAttachment has no PV name", "VolumeAttachment", klog.KObj(va))
continue
}
pv, err := pl.pvLister.Get(*va.Spec.Source.PersistentVolumeName)
if err != nil {
logger.V(5).Info("Unable to get PV for VolumeAttachment", "VolumeAttachment", klog.KObj(va), "err", err)
continue
}
if pv.Spec.CSI == nil {
logger.V(5).Info("PV is not a CSI volume", "PV", klog.KObj(pv))
continue
}
volumeID := getVolumeUniqueName(va.Spec.Attacher, pv.Spec.CSI.VolumeHandle)
volumeAttachments[volumeID] = va.Spec.Attacher
}
}
return volumeAttachments, nil
}
func getVolumeUniqueName(driverName, volumeHandle string) string {
return fmt.Sprintf("%s/%s", driverName, volumeHandle)
}

View File

@ -265,6 +265,7 @@ func TestCSILimits(t *testing.T) {
extraClaims []v1.PersistentVolumeClaim
filterName string
maxVols int32
vaCount int
driverNames []string
test string
migrationEnabled bool
@ -273,6 +274,27 @@ func TestCSILimits(t *testing.T) {
wantStatus *framework.Status
wantPreFilterStatus *framework.Status
}{
{
newPod: csiEBSOneVolPod,
existingPods: []*v1.Pod{},
filterName: "csi",
maxVols: 2,
driverNames: []string{ebsCSIDriverName},
vaCount: 2,
test: "should count VolumeAttachments towards volume limit when no pods exist",
limitSource: "csinode",
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
},
{
newPod: csiEBSOneVolPod,
existingPods: []*v1.Pod{},
filterName: "csi",
maxVols: 2,
driverNames: []string{ebsCSIDriverName},
vaCount: 1,
test: "should schedule pod when VolumeAttachments count does not exceed limit",
limitSource: "csinode",
},
{
newPod: csiEBSOneVolPod,
existingPods: []*v1.Pod{runningPod, csiEBSTwoVolPod},
@ -609,6 +631,7 @@ func TestCSILimits(t *testing.T) {
pvLister: getFakeCSIPVLister(test.filterName, test.driverNames...),
pvcLister: append(getFakeCSIPVCLister(test.filterName, scName, test.driverNames...), test.extraClaims...),
scLister: getFakeCSIStorageClassLister(scName, test.driverNames[0]),
vaLister: getFakeVolumeAttachmentLister(test.vaCount, test.driverNames...),
randomVolumeIDPrefix: rand.String(32),
translator: csiTranslator,
}
@ -769,6 +792,28 @@ func TestCSILimitsAddedPVCQHint(t *testing.T) {
}
}
func getFakeVolumeAttachmentLister(count int, driverNames ...string) tf.VolumeAttachmentLister {
vaLister := tf.VolumeAttachmentLister{}
for _, driver := range driverNames {
for j := 0; j < count; j++ {
pvName := fmt.Sprintf("csi-%s-%d", driver, j)
va := storagev1.VolumeAttachment{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("va-%s-%d", driver, j),
},
Spec: storagev1.VolumeAttachmentSpec{
NodeName: "node-for-max-pd-test-1",
Attacher: driver,
Source: storagev1.VolumeAttachmentSource{
PersistentVolumeName: &pvName,
},
},
}
vaLister = append(vaLister, va)
}
}
return vaLister
}
func getFakeCSIPVLister(volumeName string, driverNames ...string) tf.PersistentVolumeLister {
pvLister := tf.PersistentVolumeLister{}
for _, driver := range driverNames {

View File

@ -313,3 +313,27 @@ func (classes StorageClassLister) Get(name string) (*storagev1.StorageClass, err
func (classes StorageClassLister) List(selector labels.Selector) ([]*storagev1.StorageClass, error) {
return nil, fmt.Errorf("not implemented")
}
// VolumeAttachmentLister declares a []storagev1.VolumeAttachment type for testing.
type VolumeAttachmentLister []storagev1.VolumeAttachment
var _ storagelisters.VolumeAttachmentLister = VolumeAttachmentLister{}
// List lists all VolumeAttachments in the indexer.
func (val VolumeAttachmentLister) List(selector labels.Selector) (ret []*storagev1.VolumeAttachment, err error) {
var list []*storagev1.VolumeAttachment
for i := range val {
list = append(list, &val[i])
}
return list, nil
}
// Get returns a fake VolumeAttachment object from the fake VolumeAttachments by name.
func (val VolumeAttachmentLister) Get(name string) (*storagev1.VolumeAttachment, error) {
for _, va := range val {
if va.Name == name {
return &va, nil
}
}
return nil, errors.NewNotFound(storagev1.Resource("volumeattachments"), name)
}

View File

@ -580,6 +580,7 @@ func ClusterRoles() []rbacv1.ClusterRole {
rbacv1helpers.NewRule("create").Groups(authorizationGroup).Resources("subjectaccessreviews").RuleOrDie(),
// Needed for volume limits
rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csinodes").RuleOrDie(),
rbacv1helpers.NewRule("get", "list", "watch").Groups(storageGroup).Resources("volumeattachments").RuleOrDie(),
// Needed for namespaceSelector feature in pod affinity
rbacv1helpers.NewRule(Read...).Groups(legacyGroup).Resources("namespaces").RuleOrDie(),
rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csidrivers").RuleOrDie(),

View File

@ -851,6 +851,14 @@ items:
- get
- list
- watch
- apiGroups:
- storage.k8s.io
resources:
- volumeattachments
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources: