diff --git a/pkg/scheduler/framework/plugins/volumezone/volume_zone.go b/pkg/scheduler/framework/plugins/volumezone/volume_zone.go index ba92b1045df..82210703307 100644 --- a/pkg/scheduler/framework/plugins/volumezone/volume_zone.go +++ b/pkg/scheduler/framework/plugins/volumezone/volume_zone.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "reflect" v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" @@ -158,21 +159,7 @@ func (pl *VolumeZone) getPVbyPod(logger klog.Logger, pod *v1.Pod) ([]pvTopology, if s := getErrorAsStatus(err); !s.IsSuccess() { return nil, s } - - for _, key := range topologyLabels { - if value, ok := pv.ObjectMeta.Labels[key]; ok { - volumeVSet, err := volumehelpers.LabelZonesToSet(value) - if err != nil { - logger.Info("Failed to parse label, ignoring the label", "label", fmt.Sprintf("%s:%s", key, value), "err", err) - continue - } - podPVTopologies = append(podPVTopologies, pvTopology{ - pvName: pv.Name, - key: key, - values: sets.Set[string](volumeVSet), - }) - } - } + podPVTopologies = append(podPVTopologies, pl.getPVTopologies(logger, pv)...) } return podPVTopologies, nil } @@ -292,7 +279,7 @@ func (pl *VolumeZone) EventsToRegister() []framework.ClusterEventWithHint { // Also, if pvc's VolumeName is filled, that also could make a pod schedulable. {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeClaimChange}, // A new pv or updating a pv's volume zone labels may make a pod schedulable. - {Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}}, + {Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeChange}, } } @@ -359,6 +346,52 @@ func (pl *VolumeZone) isSchedulableAfterStorageClassAdded(logger klog.Logger, po return framework.Queue, nil } +// isSchedulableAfterPersistentVolumeChange is invoked whenever a PersistentVolume added or updated. +// It checks whether the change of PV has made a previously unschedulable pod schedulable. +// Changing the PV topology labels could cause the pod to become schedulable. +func (pl *VolumeZone) isSchedulableAfterPersistentVolumeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + originalPV, modifiedPV, err := util.As[*v1.PersistentVolume](oldObj, newObj) + if err != nil { + return framework.Queue, fmt.Errorf("unexpected objects in isSchedulableAfterPersistentVolumeChange: %w", err) + } + if originalPV == nil { + logger.V(5).Info("PV is newly created, which might make the pod schedulable") + return framework.Queue, nil + } + originalPVTopologies := pl.getPVTopologies(logger, originalPV) + modifiedPVTopologies := pl.getPVTopologies(logger, modifiedPV) + if !reflect.DeepEqual(originalPVTopologies, modifiedPVTopologies) { + logger.V(5).Info("PV's topology was updated, which might make the pod schedulable.", "pod", klog.KObj(pod), "PV", klog.KObj(modifiedPV)) + return framework.Queue, nil + } + + logger.V(5).Info("PV was updated, but the topology is unchanged, which it doesn't make the pod schedulable", "pod", klog.KObj(pod), "PV", klog.KObj(modifiedPV)) + return framework.QueueSkip, nil +} + +// getPVTopologies retrieves pvTopology from a given PV and returns the array +// This function doesn't check spec.nodeAffinity +// because it's read-only after creation and thus cannot be updated +// and nodeAffinity is being handled in node affinity plugin +func (pl *VolumeZone) getPVTopologies(logger klog.Logger, pv *v1.PersistentVolume) []pvTopology { + podPVTopologies := make([]pvTopology, 0) + for _, key := range topologyLabels { + if value, ok := pv.ObjectMeta.Labels[key]; ok { + labelZonesSet, err := volumehelpers.LabelZonesToSet(value) + if err != nil { + logger.V(5).Info("failed to parse PV's topology label, ignoring the label", "label", fmt.Sprintf("%s:%s", key, value), "err", err) + continue + } + podPVTopologies = append(podPVTopologies, pvTopology{ + pvName: pv.Name, + key: key, + values: sets.Set[string](labelZonesSet), + }) + } + } + return podPVTopologies +} + // New initializes a new plugin and returns it. func New(_ context.Context, _ runtime.Object, handle framework.Handle) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() diff --git a/pkg/scheduler/framework/plugins/volumezone/volume_zone_test.go b/pkg/scheduler/framework/plugins/volumezone/volume_zone_test.go index 9bcffd384d6..5c95dce69cc 100644 --- a/pkg/scheduler/framework/plugins/volumezone/volume_zone_test.go +++ b/pkg/scheduler/framework/plugins/volumezone/volume_zone_test.go @@ -675,6 +675,105 @@ func TestIsSchedulableAfterStorageClassAdded(t *testing.T) { } } +func TestIsSchedulableAfterPersistentVolumeChange(t *testing.T) { + testcases := map[string]struct { + pod *v1.Pod + oldObj, newObj interface{} + expectedHint framework.QueueingHint + expectedErr bool + }{ + "error-wrong-new-object": { + pod: createPodWithVolume("pod_1", "PVC_1"), + newObj: "not-a-pv", + expectedHint: framework.Queue, + expectedErr: true, + }, + "error-wrong-old-object": { + pod: createPodWithVolume("pod_1", "PVC_1"), + oldObj: "not-a-pv", + newObj: st.MakePersistentVolume().Name("Vol_1").Obj(), + expectedHint: framework.Queue, + expectedErr: true, + }, + "new-pv-was-added": { + pod: createPodWithVolume("pod_1", "PVC_1"), + newObj: &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "Vol_1", + Labels: map[string]string{v1.LabelFailureDomainBetaZone: "us-west1-b"}, + }, + }, + expectedHint: framework.Queue, + }, + "pv-was-updated-and-changed-topology": { + pod: createPodWithVolume("pod_1", "PVC_1"), + oldObj: &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "Vol_1", + Labels: map[string]string{v1.LabelFailureDomainBetaZone: "us-west1-a"}, + }, + }, + newObj: &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "Vol_1", + Labels: map[string]string{v1.LabelFailureDomainBetaZone: "us-west1-b"}, + }, + }, + expectedHint: framework.Queue, + }, + "pv-was-updated-and-added-topology-label": { + pod: createPodWithVolume("pod_1", "PVC_1"), + oldObj: &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "Vol_1", + Labels: map[string]string{v1.LabelFailureDomainBetaZone: "us-west1-a"}, + }, + }, + newObj: &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "Vol_1", + Labels: map[string]string{v1.LabelFailureDomainBetaZone: "us-west1-a", + v1.LabelTopologyZone: "zone"}, + }, + }, + expectedHint: framework.Queue, + }, + "pv-was-updated-but-no-topology-is-changed": { + pod: createPodWithVolume("pod_1", "PVC_1"), + oldObj: &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "Vol_1", + Labels: map[string]string{v1.LabelFailureDomainBetaZone: "us-west1-a", + v1.LabelTopologyZone: "zone"}, + }, + }, + newObj: &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "Vol_1", + Labels: map[string]string{v1.LabelFailureDomainBetaZone: "us-west1-a", + v1.LabelTopologyZone: "zone"}, + }, + }, + expectedHint: framework.QueueSkip, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + p := &VolumeZone{} + + got, err := p.isSchedulableAfterPersistentVolumeChange(logger, tc.pod, tc.oldObj, tc.newObj) + if err != nil && !tc.expectedErr { + t.Errorf("unexpected error: %v", err) + } + if got != tc.expectedHint { + t.Errorf("isSchedulableAfterPersistentVolumeChange() = %v, want %v", got, tc.expectedHint) + } + }) + } +} + func BenchmarkVolumeZone(b *testing.B) { tests := []struct { Name string