From 5f69cf74d836ef08c3a340b7d0b48c4a463a3258 Mon Sep 17 00:00:00 2001 From: Sung Jun Park Date: Wed, 27 Jan 2021 23:54:03 +0000 Subject: [PATCH] fix: when newly binding pvc to a pv, adjust pvc.status.capacity to pv's annotation that denotes the pre-resize capacity of the original pvc that pv was bound to if it has one test: confirm that pvc's status capacity is adjusted if pv has a pre-resize capacity annotation --- .../volume/expand/expand_controller.go | 38 ++++++++-- .../volume/expand/expand_controller_test.go | 48 +++++++++--- .../volume/persistentvolume/pv_controller.go | 16 +++- .../persistentvolume/pv_controller_test.go | 12 +++ .../operationexecutor/operation_generator.go | 8 ++ pkg/volume/util/resize_util.go | 74 +++++++++++++++---- 6 files changed, 164 insertions(+), 32 deletions(-) diff --git a/pkg/controller/volume/expand/expand_controller.go b/pkg/controller/volume/expand/expand_controller.go index 5d17b256536..2bd420674d0 100644 --- a/pkg/controller/volume/expand/expand_controller.go +++ b/pkg/controller/volume/expand/expand_controller.go @@ -151,13 +151,18 @@ func NewExpandController( return } - oldSize := oldPVC.Spec.Resources.Requests[v1.ResourceStorage] + oldReq := oldPVC.Spec.Resources.Requests[v1.ResourceStorage] + oldCap := oldPVC.Status.Capacity[v1.ResourceStorage] newPVC, ok := new.(*v1.PersistentVolumeClaim) if !ok { return } - newSize := newPVC.Spec.Resources.Requests[v1.ResourceStorage] - if newSize.Cmp(oldSize) > 0 { + newReq := newPVC.Spec.Resources.Requests[v1.ResourceStorage] + newCap := newPVC.Status.Capacity[v1.ResourceStorage] + // PVC will be enqueued under 2 circumstances + // 1. User has increased PVC's request capacity --> volume needs to be expanded + // 2. PVC status capacity has been expanded --> claim's bound PV has likely recently gone through filesystem resize, so remove AnnPreResizeCapacity annotation from PV + if newReq.Cmp(oldReq) > 0 || newCap.Cmp(oldCap) > 0 { expc.enqueuePVC(new) } }, @@ -173,10 +178,7 @@ func (expc *expandController) enqueuePVC(obj interface{}) { return } - size := pvc.Spec.Resources.Requests[v1.ResourceStorage] - statusSize := pvc.Status.Capacity[v1.ResourceStorage] - - if pvc.Status.Phase == v1.ClaimBound && size.Cmp(statusSize) > 0 { + if pvc.Status.Phase == v1.ClaimBound { key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(pvc) if err != nil { runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", pvc, err)) @@ -233,6 +235,16 @@ func (expc *expandController) syncHandler(key string) error { return err } + pvcRequestSize := pvc.Spec.Resources.Requests[v1.ResourceStorage] + pvcStatusSize := pvc.Status.Capacity[v1.ResourceStorage] + + // call expand operation only under two condition + // 1. pvc's request size has been expanded and is larger than pvc's current status size + // 2. pv has an pre-resize capacity annotation + if pvcRequestSize.Cmp(pvcStatusSize) <= 0 && !metav1.HasAnnotation(pv.ObjectMeta, util.AnnPreResizeCapacity) { + return nil + } + volumeSpec := volume.NewSpecFromPersistentVolume(pv, false) migratable, err := expc.csiMigratedPluginManager.IsMigratable(volumeSpec) if err != nil { @@ -285,6 +297,11 @@ func (expc *expandController) syncHandler(key string) error { } func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, resizerName string) error { + // if node expand is complete and pv's annotation can be removed, remove the annotation from pv and return + if expc.isNodeExpandComplete(pvc, pv) && metav1.HasAnnotation(pv.ObjectMeta, util.AnnPreResizeCapacity) { + return util.DeleteAnnPreResizeCapacity(pv, expc.GetKubeClient()) + } + pvc, err := util.MarkResizeInProgressWithResizer(pvc, resizerName, expc.kubeClient) if err != nil { klog.V(5).Infof("Error setting PVC %s in progress with error : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) @@ -337,6 +354,13 @@ func (expc *expandController) getPersistentVolume(pvc *v1.PersistentVolumeClaim) return pv.DeepCopy(), nil } +// isNodeExpandComplete returns true if pvc.Status.Capacity >= pv.Spec.Capacity +func (expc *expandController) isNodeExpandComplete(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool { + klog.V(4).Infof("pv %q capacity = %v, pvc %s capacity = %v", pv.Name, pv.Spec.Capacity[v1.ResourceStorage], pvc.ObjectMeta.Name, pvc.Status.Capacity[v1.ResourceStorage]) + pvcCap, pvCap := pvc.Status.Capacity[v1.ResourceStorage], pv.Spec.Capacity[v1.ResourceStorage] + return pvcCap.Cmp(pvCap) >= 0 +} + // Implementing VolumeHost interface func (expc *expandController) GetPluginDir(pluginName string) string { return "" diff --git a/pkg/controller/volume/expand/expand_controller_test.go b/pkg/controller/volume/expand/expand_controller_test.go index cf5d37b180a..d495d8c2fde 100644 --- a/pkg/controller/volume/expand/expand_controller_test.go +++ b/pkg/controller/volume/expand/expand_controller_test.go @@ -24,6 +24,7 @@ import ( "testing" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -40,6 +41,7 @@ import ( "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/awsebs" "k8s.io/kubernetes/pkg/volume/csimigration" + "k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/pkg/volume/util/operationexecutor" volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) @@ -57,30 +59,42 @@ func TestSyncHandler(t *testing.T) { }{ { name: "when pvc has no PV binding", - pvc: getFakePersistentVolumeClaim("no-pv-pvc", "", ""), + pvc: getFakePersistentVolumeClaim("no-pv-pvc", "", "1Gi", "1Gi", ""), pvcKey: "default/no-pv-pvc", hasError: true, }, { name: "when pvc and pv has everything for in-tree plugin", - pv: getFakePersistentVolume("vol-3", csitranslationplugins.AWSEBSInTreePluginName, "good-pvc-vol-3"), - pvc: getFakePersistentVolumeClaim("good-pvc", "vol-3", "good-pvc-vol-3"), + pv: getFakePersistentVolume("vol-3", csitranslationplugins.AWSEBSInTreePluginName, "1Gi", "good-pvc-vol-3"), + pvc: getFakePersistentVolumeClaim("good-pvc", "vol-3", "1Gi", "2Gi", "good-pvc-vol-3"), pvcKey: "default/good-pvc", expansionCalled: true, expectedAnnotation: map[string]string{volumetypes.VolumeResizerKey: csitranslationplugins.AWSEBSInTreePluginName}, }, + { + name: "if pv has pre-resize capacity annotation, generate expand operation should not be called", + pv: func() *v1.PersistentVolume { + pv := getFakePersistentVolume("vol-4", csitranslationplugins.AWSEBSInTreePluginName, "2Gi", "good-pvc-vol-4") + pv.ObjectMeta.Annotations = make(map[string]string) + pv.ObjectMeta.Annotations[util.AnnPreResizeCapacity] = "1Gi" + return pv + }(), + pvc: getFakePersistentVolumeClaim("good-pvc", "vol-4", "2Gi", "2Gi", "good-pvc-vol-4"), + pvcKey: "default/good-pvc", + expansionCalled: false, + }, { name: "when csi migration is enabled for a in-tree plugin", csiMigrationEnabled: true, - pv: getFakePersistentVolume("vol-4", csitranslationplugins.AWSEBSInTreePluginName, "csi-pvc-vol-4"), - pvc: getFakePersistentVolumeClaim("csi-pvc", "vol-4", "csi-pvc-vol-4"), + pv: getFakePersistentVolume("vol-4", csitranslationplugins.AWSEBSInTreePluginName, "1Gi", "csi-pvc-vol-5"), + pvc: getFakePersistentVolumeClaim("csi-pvc", "vol-4", "1Gi", "2Gi", "csi-pvc-vol-5"), pvcKey: "default/csi-pvc", expectedAnnotation: map[string]string{volumetypes.VolumeResizerKey: csitranslationplugins.AWSEBSDriverName}, }, { name: "for csi plugin without migration path", - pv: getFakePersistentVolume("vol-5", "com.csi.ceph", "ceph-csi-pvc-vol-5"), - pvc: getFakePersistentVolumeClaim("ceph-csi-pvc", "vol-5", "ceph-csi-pvc-vol-5"), + pv: getFakePersistentVolume("vol-5", "com.csi.ceph", "1Gi", "ceph-csi-pvc-vol-6"), + pvc: getFakePersistentVolumeClaim("ceph-csi-pvc", "vol-5", "1Gi", "2Gi", "ceph-csi-pvc-vol-6"), pvcKey: "default/ceph-csi-pvc", expansionCalled: false, hasError: false, @@ -177,7 +191,7 @@ func applyPVCPatch(originalPVC *v1.PersistentVolumeClaim, patch []byte) (*v1.Per return updatedPVC, nil } -func getFakePersistentVolume(volumeName, pluginName string, pvcUID types.UID) *v1.PersistentVolume { +func getFakePersistentVolume(volumeName, pluginName string, size string, pvcUID types.UID) *v1.PersistentVolume { pv := &v1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{Name: volumeName}, Spec: v1.PersistentVolumeSpec{ @@ -185,6 +199,9 @@ func getFakePersistentVolume(volumeName, pluginName string, pvcUID types.UID) *v ClaimRef: &v1.ObjectReference{ Namespace: "default", }, + Capacity: map[v1.ResourceName]resource.Quantity{ + v1.ResourceStorage: resource.MustParse(size), + }, }, } if pvcUID != "" { @@ -205,10 +222,21 @@ func getFakePersistentVolume(volumeName, pluginName string, pvcUID types.UID) *v return pv } -func getFakePersistentVolumeClaim(pvcName, volumeName string, uid types.UID) *v1.PersistentVolumeClaim { +func getFakePersistentVolumeClaim(pvcName, volumeName, statusSize, requestSize string, uid types.UID) *v1.PersistentVolumeClaim { pvc := &v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{Name: pvcName, Namespace: "default", UID: uid}, - Spec: v1.PersistentVolumeClaimSpec{}, + Spec: v1.PersistentVolumeClaimSpec{ + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceStorage: resource.MustParse(requestSize), + }, + }, + }, + Status: v1.PersistentVolumeClaimStatus{ + Capacity: map[v1.ResourceName]resource.Quantity{ + v1.ResourceStorage: resource.MustParse(statusSize), + }, + }, } if volumeName != "" { pvc.Spec.VolumeName = volumeName diff --git a/pkg/controller/volume/persistentvolume/pv_controller.go b/pkg/controller/volume/persistentvolume/pv_controller.go index 70d82bfa3d9..c0865c34d23 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller.go +++ b/pkg/controller/volume/persistentvolume/pv_controller.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -783,7 +784,20 @@ func (ctrl *PersistentVolumeController) updateClaimStatus(claim *v1.PersistentVo return nil, fmt.Errorf("PersistentVolume %q is without a storage capacity", volume.Name) } claimCap, ok := claim.Status.Capacity[v1.ResourceStorage] - if !ok || volumeCap.Cmp(claimCap) != 0 { + // If PV has a resize annotation, set the claim's request capacity + if metav1.HasAnnotation(volume.ObjectMeta, util.AnnPreResizeCapacity) { + klog.V(2).Infof("volume %q requires filesystem resize: setting pvc %s status capacity to %s", volume.Name, claimToClaimKey(claim), volume.ObjectMeta.Annotations[util.AnnPreResizeCapacity]) + preQty, err := resource.ParseQuantity(volume.ObjectMeta.Annotations[util.AnnPreResizeCapacity]) + if err != nil { + klog.Warningf("Parsing pre-resize-capacity from PV(%q) failed", volume.Name, err) + preQty = volume.Spec.Capacity[v1.ResourceStorage] + } + if claimClone.Status.Capacity == nil { + claimClone.Status.Capacity = make(map[v1.ResourceName]resource.Quantity) + } + claimClone.Status.Capacity[v1.ResourceStorage] = preQty + dirty = true + } else if !ok || volumeCap.Cmp(claimCap) != 0 { claimClone.Status.Capacity = volume.Spec.Capacity dirty = true } diff --git a/pkg/controller/volume/persistentvolume/pv_controller_test.go b/pkg/controller/volume/persistentvolume/pv_controller_test.go index 5eb60e5e1eb..4523d196f2b 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller_test.go +++ b/pkg/controller/volume/persistentvolume/pv_controller_test.go @@ -42,6 +42,7 @@ import ( pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume/csimigration" + "k8s.io/kubernetes/pkg/volume/util" ) var ( @@ -89,6 +90,17 @@ func TestControllerSync(t *testing.T) { return nil }, }, + { + "5-2-3 - complete bind when PV and PVC both exist and PV has AnnPreResizeCapacity annotation", + volumesWithAnnotation(util.AnnPreResizeCapacity, "1Gi", newVolumeArray("volume5-2", "2Gi", "", "", v1.VolumeAvailable, v1.PersistentVolumeReclaimRetain, classEmpty, pvutil.AnnBoundByController)), + volumesWithAnnotation(util.AnnPreResizeCapacity, "1Gi", newVolumeArray("volume5-2", "2Gi", "uid5-2", "claim5-2", v1.VolumeBound, v1.PersistentVolumeReclaimRetain, classEmpty, pvutil.AnnBoundByController)), + withExpectedCapacity("2Gi", newClaimArray("claim5-2", "uid5-2", "2Gi", "", v1.ClaimPending, nil)), + withExpectedCapacity("1Gi", newClaimArray("claim5-2", "uid5-2", "2Gi", "volume5-2", v1.ClaimBound, nil, pvutil.AnnBoundByController, pvutil.AnnBindCompleted)), + noevents, noerrors, + func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error { + return nil + }, + }, { // deleteClaim with a bound claim makes bound volume released. "5-3 - delete claim", diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index bdd81aaca17..1fc15038f44 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -1551,6 +1551,14 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( klog.Warning(detailedErr) return volumetypes.NewOperationContext(nil, nil, migrated) } + oldCapacity := pvc.Status.Capacity[v1.ResourceStorage] + err = util.AddAnnPreResizeCapacity(pv, oldCapacity, og.kubeClient) + if err != nil { + detailedErr := fmt.Errorf("error updating pv %s annotation (%s) with pre-resize capacity %s: %v", pv.ObjectMeta.Name, util.AnnPreResizeCapacity, oldCapacity.String(), err) + klog.Warning(detailedErr) + return volumetypes.NewOperationContext(nil, nil, migrated) + } + } return volumetypes.NewOperationContext(nil, nil, migrated) } diff --git a/pkg/volume/util/resize_util.go b/pkg/volume/util/resize_util.go index 9249380dc94..195819d1408 100644 --- a/pkg/volume/util/resize_util.go +++ b/pkg/volume/util/resize_util.go @@ -21,7 +21,7 @@ import ( "encoding/json" "fmt" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -39,6 +39,12 @@ var ( v1.PersistentVolumeClaimFileSystemResizePending: true, v1.PersistentVolumeClaimResizing: true, } + + // AnnPreResizeCapacity annotation is added to a PV when expanding volume. + // Its value is status capacity of the PVC prior to the volume expansion + // Its value will be set by the external-resizer when it deems that filesystem resize is required after resizing volume. + // Its value will be used by pv_controller to determine pvc's status capacity when binding pvc and pv. + AnnPreResizeCapacity = "volume.alpha.kubernetes.io/pre-resize-capacity" ) type resizeProcessStatus struct { @@ -57,27 +63,67 @@ func UpdatePVSize( newSize resource.Quantity, kubeClient clientset.Interface) error { pvClone := pv.DeepCopy() - - oldData, err := json.Marshal(pvClone) - if err != nil { - return fmt.Errorf("unexpected error marshaling old PV %q with error : %v", pvClone.Name, err) - } - pvClone.Spec.Capacity[v1.ResourceStorage] = newSize - newData, err := json.Marshal(pvClone) - if err != nil { - return fmt.Errorf("unexpected error marshaling new PV %q with error : %v", pvClone.Name, err) + return PatchPV(pv, pvClone, kubeClient) +} + +// AddAnnPreResizeCapacity adds volume.alpha.kubernetes.io/pre-resize-capacity from the pv +func AddAnnPreResizeCapacity( + pv *v1.PersistentVolume, + oldCapacity resource.Quantity, + kubeClient clientset.Interface) error { + // if the pv already has a resize annotation skip the process + if metav1.HasAnnotation(pv.ObjectMeta, AnnPreResizeCapacity) { + return nil } - patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pvClone) + pvClone := pv.DeepCopy() + if pvClone.ObjectMeta.Annotations == nil { + pvClone.ObjectMeta.Annotations = make(map[string]string) + } + pvClone.ObjectMeta.Annotations[AnnPreResizeCapacity] = oldCapacity.String() + + return PatchPV(pv, pvClone, kubeClient) +} + +// DeleteAnnPreResizeCapacity deletes volume.alpha.kubernetes.io/pre-resize-capacity from the pv +func DeleteAnnPreResizeCapacity( + pv *v1.PersistentVolume, + kubeClient clientset.Interface) error { + // if the pv does not have a resize annotation skip the entire process + if !metav1.HasAnnotation(pv.ObjectMeta, AnnPreResizeCapacity) { + return nil + } + pvClone := pv.DeepCopy() + delete(pvClone.ObjectMeta.Annotations, AnnPreResizeCapacity) + + return PatchPV(pv, pvClone, kubeClient) +} + +// PatchPV creates and executes a patch for pv +func PatchPV( + oldPV *v1.PersistentVolume, + newPV *v1.PersistentVolume, + kubeClient clientset.Interface) error { + oldData, err := json.Marshal(oldPV) if err != nil { - return fmt.Errorf("error Creating two way merge patch for PV %q with error : %v", pvClone.Name, err) + return fmt.Errorf("unexpected error marshaling old PV %q with error : %v", oldPV.Name, err) } - _, err = kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), pvClone.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) + newData, err := json.Marshal(newPV) if err != nil { - return fmt.Errorf("error Patching PV %q with error : %v", pvClone.Name, err) + return fmt.Errorf("unexpected error marshaling new PV %q with error : %v", newPV.Name, err) + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldPV) + if err != nil { + return fmt.Errorf("error Creating two way merge patch for PV %q with error : %v", oldPV.Name, err) + } + + _, err = kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), oldPV.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("error Patching PV %q with error : %v", oldPV.Name, err) } return nil }