From cd13be8654609e180478573256e71f39d11d34c5 Mon Sep 17 00:00:00 2001 From: HirazawaUi <695097494plus@gmail.com> Date: Sun, 2 Jun 2024 22:05:00 +0800 Subject: [PATCH] Add QueueingHintFn for pvc events in VolumeRestriction plugin --- .../volumerestrictions/volume_restrictions.go | 29 +++++- .../volume_restrictions_test.go | 94 +++++++++++++++++++ 2 files changed, 122 insertions(+), 1 deletion(-) diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go index b11f8c30cb8..b3b05333240 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go @@ -329,10 +329,37 @@ func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEventWithHin {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}}, // Pods may fail to schedule because the PVC it uses has not yet been created. // This PVC is required to exist to check its access modes. - {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}}, + {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}, + QueueingHintFn: pl.isSchedulableAfterPersistentVolumeClaimAdded}, } } +// isSchedulableAfterPersistentVolumeClaimAdded is invoked whenever a PersistentVolumeClaim added or changed, It checks whether +// that change made a previously unschedulable pod schedulable. +func (pl *VolumeRestrictions) isSchedulableAfterPersistentVolumeClaimAdded(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + _, newPersistentVolumeClaim, err := util.As[*v1.PersistentVolumeClaim](oldObj, newObj) + if err != nil { + return framework.Queue, fmt.Errorf("unexpected objects in isSchedulableAfterPersistentVolumeClaimChange: %w", err) + } + + if newPersistentVolumeClaim.Namespace != pod.Namespace { + return framework.QueueSkip, nil + } + + for _, volume := range pod.Spec.Volumes { + if volume.PersistentVolumeClaim == nil { + continue + } + + if volume.PersistentVolumeClaim.ClaimName == newPersistentVolumeClaim.Name { + logger.V(5).Info("PVC that is referred from the pod was created, which might make this pod schedulable", "pod", klog.KObj(pod), "PVC", klog.KObj(newPersistentVolumeClaim)) + return framework.Queue, nil + } + } + logger.V(5).Info("PVC irrelevant to the Pod was created, which doesn't make this pod schedulable", "pod", klog.KObj(pod), "PVC", klog.KObj(newPersistentVolumeClaim)) + return framework.QueueSkip, nil +} + // isSchedulableAfterPodDeleted is invoked whenever a pod deleted, // It checks whether the deleted pod will conflict with volumes of other pods on the same node // TODO If we observe good throughput, we will add a check for conflicts between the deleted Pod and the readWriteOncePodPVC of the current Pod. diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go index 8fe54ed00db..1c2877aed75 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go @@ -667,6 +667,100 @@ func Test_isSchedulableAfterPodDeleted(t *testing.T) { } } +func Test_isSchedulableAfterPersistentVolumeClaimChange(t *testing.T) { + podWithOnePVC := st.MakePod().Name("pod-with-one-pvc").Namespace(metav1.NamespaceDefault).PVC("claim-with-rwx-1").Node("node-1").Obj() + podWithTwoPVCs := st.MakePod().Name("pod-with-two-pvcs").Namespace(metav1.NamespaceDefault).PVC("claim-with-rwx-1").PVC("claim-with-rwx-2").Node("node-1").Obj() + podWithNotEqualNamespace := st.MakePod().Name("pod-with-one-pvc").Namespace(metav1.NamespaceSystem).PVC("claim-with-rwx-1").Node("claim-with-rwx-2").Obj() + + PVC1 := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "claim-with-rwx-1", + }, + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteMany}, + }, + } + + PVC2 := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "claim-with-rwx-2", + }, + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteMany}, + }, + } + + testcases := map[string]struct { + existingPods []*v1.Pod + pod *v1.Pod + oldObj, newObj interface{} + expectedHint framework.QueueingHint + expectedErr bool + }{ + "queue-new-object-pvc-belong-pod": { + existingPods: []*v1.Pod{}, + pod: podWithTwoPVCs, + newObj: PVC1, + expectedHint: framework.Queue, + expectedErr: false, + }, + "skip-new-object-unused": { + existingPods: []*v1.Pod{}, + pod: podWithOnePVC, + newObj: PVC2, + expectedHint: framework.QueueSkip, + expectedErr: false, + }, + "skip-nil-old-object": { + existingPods: []*v1.Pod{}, + pod: podWithOnePVC, + newObj: PVC2, + expectedHint: framework.QueueSkip, + expectedErr: false, + }, + "skip-new-object-not-belong-pod": { + existingPods: []*v1.Pod{}, + pod: podWithOnePVC, + newObj: PVC2, + expectedHint: framework.QueueSkip, + expectedErr: false, + }, + "skip-new-object-namespace-not-equal-pod": { + existingPods: []*v1.Pod{}, + pod: podWithNotEqualNamespace, + newObj: PVC1, + expectedHint: framework.QueueSkip, + expectedErr: false, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + p := newPluginWithListers(ctx, t, tc.existingPods, nil, []*v1.PersistentVolumeClaim{tc.newObj.(*v1.PersistentVolumeClaim)}) + + actualHint, err := p.(*VolumeRestrictions).isSchedulableAfterPersistentVolumeClaimAdded(logger, tc.pod, tc.oldObj, tc.newObj) + if tc.expectedErr { + if err == nil { + t.Error("Expect error, but got nil") + } + return + } + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + if diff := cmp.Diff(tc.expectedHint, actualHint); diff != "" { + t.Errorf("Unexpected QueueingHint (-want, +got): %s", diff) + } + }) + } +} + func newPlugin(ctx context.Context, t *testing.T) framework.Plugin { return newPluginWithListers(ctx, t, nil, nil, nil) }