Merge pull request #129557 from googs1025/feature/add_QueueingHint_for_VolumeAttachment_deletion_events

feature(scheduler): add queueinghint for volumeattachment deletion
This commit is contained in:
Kubernetes Prow Robot 2025-02-22 00:10:26 -08:00 committed by GitHub
commit 4032177faf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 251 additions and 1 deletions

View File

@ -89,7 +89,7 @@ func (pl *CSILimits) EventsToRegister(_ context.Context) ([]framework.ClusterEve
{Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Add}},
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted},
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}, QueueingHintFn: pl.isSchedulableAfterPVCAdded},
{Event: framework.ClusterEvent{Resource: framework.VolumeAttachment, ActionType: framework.Delete}},
{Event: framework.ClusterEvent{Resource: framework.VolumeAttachment, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterVolumeAttachmentDeleted},
}, nil
}
@ -149,6 +149,44 @@ func (pl *CSILimits) isSchedulableAfterPVCAdded(logger klog.Logger, pod *v1.Pod,
return framework.QueueSkip, nil
}
func (pl *CSILimits) isSchedulableAfterVolumeAttachmentDeleted(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
deletedVolumeAttachment, _, err := util.As[*storagev1.VolumeAttachment](oldObj, newObj)
if err != nil {
return framework.Queue, fmt.Errorf("unexpected objects in isSchedulableAfterVolumeAttachmentDeleted: %w", err)
}
for _, vol := range pod.Spec.Volumes {
// Check if the pod volume uses a PVC
// If it does, return Queue
if vol.PersistentVolumeClaim != nil {
logger.V(5).Info("Pod volume uses PersistentVolumeClaim, which might make this pod schedulable due to VolumeAttachment deletion", "pod", klog.KObj(pod), "volumeAttachment", klog.KObj(deletedVolumeAttachment), "volume", vol.Name)
return framework.Queue, nil
}
if !pl.translator.IsInlineMigratable(&vol) {
continue
}
translatedPV, err := pl.translator.TranslateInTreeInlineVolumeToCSI(logger, &vol, pod.Namespace)
if err != nil || translatedPV == nil {
return framework.Queue, fmt.Errorf("converting volume(%s) from inline to csi: %w", vol.Name, err)
}
if translatedPV.Spec.CSI != nil && deletedVolumeAttachment.Spec.Attacher == translatedPV.Spec.CSI.Driver {
// deleted VolumeAttachment Attacher matches the translated PV CSI driver
logger.V(5).Info("Pod volume is an Inline Migratable volume that matches the CSI driver, which might make this pod schedulable due to VolumeAttachment deletion",
"pod", klog.KObj(pod), "volumeAttachment", klog.KObj(deletedVolumeAttachment),
"volume", vol.Name, "csiDriver", translatedPV.Spec.CSI.Driver,
)
return framework.Queue, nil
}
}
logger.V(5).Info("the VolumeAttachment deletion wouldn't make this pod schedulable because the pod has no volume related to a deleted VolumeAttachment",
"pod", klog.KObj(pod), "volumeAttachment", klog.KObj(deletedVolumeAttachment))
return framework.QueueSkip, nil
}
// PreFilter invoked at the prefilter extension point
//
// If the pod haven't those types of volumes, we'll skip the Filter phase

View File

@ -792,6 +792,179 @@ func TestCSILimitsAddedPVCQHint(t *testing.T) {
}
}
func TestCSILimitsDeletedVolumeAttachmentQHint(t *testing.T) {
tests := []struct {
test string
newPod *v1.Pod
existingPVC *v1.PersistentVolumeClaim
deletedVA *storagev1.VolumeAttachment
wantQHint framework.QueueingHint
}{
{
test: "a pod has PVC when VolumeAttachment is deleting",
newPod: st.MakePod().Namespace("ns1").Volume(
v1.Volume{
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "pvc1",
},
},
},
).Obj(),
existingPVC: st.MakePersistentVolumeClaim().Name("pvc1").Namespace("ns1").
VolumeName("pv1").Obj(),
deletedVA: st.MakeVolumeAttachment().Name("volumeattachment1").
NodeName("fake-node").
Attacher("test.storage.gke.io").
Source(storagev1.VolumeAttachmentSource{PersistentVolumeName: ptr.To("pv1")}).Obj(),
wantQHint: framework.Queue,
},
{
test: "a pod has an Inline Migratable volume (AWSEBSDriver) when VolumeAttachment (AWSEBSDriver) is deleting (match)",
newPod: st.MakePod().Namespace("ns1").Volume(
v1.Volume{
VolumeSource: v1.VolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{
VolumeID: "test",
},
},
},
).Obj(),
deletedVA: st.MakeVolumeAttachment().Name("volumeattachment1").
NodeName("fake-node").
Attacher("ebs.csi.aws.com").
Source(storagev1.VolumeAttachmentSource{
InlineVolumeSpec: &v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
CSI: &v1.CSIPersistentVolumeSource{
Driver: "ebs.csi.aws.com",
},
},
},
}).Obj(),
wantQHint: framework.Queue,
},
{
test: "a pod has an Inline Migratable volume (GCEPDDriver) when VolumeAttachment (AWSEBSDriver) is deleting (no match)",
newPod: st.MakePod().Namespace("ns1").Volume(
v1.Volume{
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: "test",
},
},
},
).Obj(),
deletedVA: st.MakeVolumeAttachment().Name("volumeattachment1").
NodeName("fake-node").
Attacher("ebs.csi.aws.com").
Source(storagev1.VolumeAttachmentSource{
InlineVolumeSpec: &v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
CSI: &v1.CSIPersistentVolumeSource{
Driver: "ebs.csi.aws.com",
},
},
},
}).Obj(),
wantQHint: framework.QueueSkip,
},
{
test: "a pod has an Inline Migratable volume (AWSEBSDriver) and PVC when VolumeAttachment (AWSEBSDriver) is deleting",
newPod: st.MakePod().Namespace("ns1").Volume(
v1.Volume{
VolumeSource: v1.VolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{
VolumeID: "test",
},
},
},
).Volume(
v1.Volume{
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "pvc1",
},
},
},
).Obj(),
existingPVC: st.MakePersistentVolumeClaim().Name("pvc1").Namespace("ns1").
VolumeName("pv1").Obj(),
deletedVA: st.MakeVolumeAttachment().Name("volumeattachment1").
NodeName("fake-node").
Attacher("ebs.csi.aws.com").
Source(storagev1.VolumeAttachmentSource{
InlineVolumeSpec: &v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
CSI: &v1.CSIPersistentVolumeSource{
Driver: "ebs.csi.aws.com",
},
},
},
}).Obj(),
wantQHint: framework.Queue,
},
{
test: "a pod has an Inline Migratable volume (AWSEBSDriver) and PVC when VolumeAttachment (AWSEBSDriver) is deleting",
newPod: st.MakePod().Namespace("ns1").Volume(
v1.Volume{
VolumeSource: v1.VolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{
VolumeID: "test",
},
},
},
).Volume(
v1.Volume{
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "pvc1",
},
},
},
).Obj(),
existingPVC: st.MakePersistentVolumeClaim().Name("pvc1").Namespace("ns1").
VolumeName("pv1").Obj(),
deletedVA: st.MakeVolumeAttachment().Name("volumeattachment1").
NodeName("fake-node").
Attacher("test.storage.gke.io").
Source(storagev1.VolumeAttachmentSource{PersistentVolumeName: ptr.To("pv1")}).Obj(),
wantQHint: framework.Queue,
},
{
test: "a pod has no PVC when VolumeAttachment is deleting",
newPod: st.MakePod().Namespace("ns1").Obj(),
deletedVA: st.MakeVolumeAttachment().Name("volumeattachment1").
NodeName("fake-node").
Attacher("test.storage.gke.io").
Source(storagev1.VolumeAttachmentSource{PersistentVolumeName: ptr.To("pv1")}).Obj(),
wantQHint: framework.QueueSkip,
},
}
for _, test := range tests {
t.Run(test.test, func(t *testing.T) {
var pvcList tf.PersistentVolumeClaimLister
if test.existingPVC != nil {
pvcList = append(pvcList, *test.existingPVC)
}
p := &CSILimits{
pvcLister: pvcList,
translator: csitrans.New(),
}
logger, _ := ktesting.NewTestContext(t)
qhint, err := p.isSchedulableAfterVolumeAttachmentDeleted(logger, test.newPod, test.deletedVA, nil)
if err != nil {
t.Errorf("isSchedulableAfterVolumeAttachmentDeleted failed: %v", err)
}
if qhint != test.wantQHint {
t.Errorf("QHint does not match: %v, want: %v", qhint, test.wantQHint)
}
})
}
}
func getFakeVolumeAttachmentLister(count int, driverNames ...string) tf.VolumeAttachmentLister {
vaLister := tf.VolumeAttachmentLister{}
for _, driver := range driverNames {

View File

@ -1298,3 +1298,42 @@ func (c *CSIStorageCapacityWrapper) Capacity(capacity *resource.Quantity) *CSISt
c.CSIStorageCapacity.Capacity = capacity
return c
}
// VolumeAttachmentWrapper wraps a VolumeAttachment inside.
type VolumeAttachmentWrapper struct{ storagev1.VolumeAttachment }
// MakeVolumeAttachment creates a VolumeAttachment wrapper.
func MakeVolumeAttachment() *VolumeAttachmentWrapper {
return &VolumeAttachmentWrapper{}
}
// Obj returns the inner VolumeAttachment.
func (c *VolumeAttachmentWrapper) Obj() *storagev1.VolumeAttachment {
return &c.VolumeAttachment
}
// Name sets `n` as the name of the inner VolumeAttachment.
func (c *VolumeAttachmentWrapper) Name(n string) *VolumeAttachmentWrapper {
c.SetName(n)
return c
}
func (c *VolumeAttachmentWrapper) Attacher(attacher string) *VolumeAttachmentWrapper {
c.Spec.Attacher = attacher
return c
}
func (c *VolumeAttachmentWrapper) NodeName(nodeName string) *VolumeAttachmentWrapper {
c.Spec.NodeName = nodeName
return c
}
func (c *VolumeAttachmentWrapper) Source(source storagev1.VolumeAttachmentSource) *VolumeAttachmentWrapper {
c.Spec.Source = source
return c
}
func (c *VolumeAttachmentWrapper) Attached(attached bool) *VolumeAttachmentWrapper {
c.Status.Attached = attached
return c
}