Merge pull request #99326 from sunpa93/fs_resize_fix

fix: use pv annotation to trigger filesystem resize when necessary
This commit is contained in:
Kubernetes Prow Robot 2021-03-09 11:05:18 -08:00 committed by GitHub
commit 5155865ae2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 164 additions and 32 deletions

View File

@ -151,13 +151,18 @@ func NewExpandController(
return
}
oldSize := oldPVC.Spec.Resources.Requests[v1.ResourceStorage]
oldReq := oldPVC.Spec.Resources.Requests[v1.ResourceStorage]
oldCap := oldPVC.Status.Capacity[v1.ResourceStorage]
newPVC, ok := new.(*v1.PersistentVolumeClaim)
if !ok {
return
}
newSize := newPVC.Spec.Resources.Requests[v1.ResourceStorage]
if newSize.Cmp(oldSize) > 0 {
newReq := newPVC.Spec.Resources.Requests[v1.ResourceStorage]
newCap := newPVC.Status.Capacity[v1.ResourceStorage]
// PVC will be enqueued under 2 circumstances
// 1. User has increased PVC's request capacity --> volume needs to be expanded
// 2. PVC status capacity has been expanded --> claim's bound PV has likely recently gone through filesystem resize, so remove AnnPreResizeCapacity annotation from PV
if newReq.Cmp(oldReq) > 0 || newCap.Cmp(oldCap) > 0 {
expc.enqueuePVC(new)
}
},
@ -173,10 +178,7 @@ func (expc *expandController) enqueuePVC(obj interface{}) {
return
}
size := pvc.Spec.Resources.Requests[v1.ResourceStorage]
statusSize := pvc.Status.Capacity[v1.ResourceStorage]
if pvc.Status.Phase == v1.ClaimBound && size.Cmp(statusSize) > 0 {
if pvc.Status.Phase == v1.ClaimBound {
key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(pvc)
if err != nil {
runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", pvc, err))
@ -233,6 +235,16 @@ func (expc *expandController) syncHandler(key string) error {
return err
}
pvcRequestSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
pvcStatusSize := pvc.Status.Capacity[v1.ResourceStorage]
// call expand operation only under two condition
// 1. pvc's request size has been expanded and is larger than pvc's current status size
// 2. pv has an pre-resize capacity annotation
if pvcRequestSize.Cmp(pvcStatusSize) <= 0 && !metav1.HasAnnotation(pv.ObjectMeta, util.AnnPreResizeCapacity) {
return nil
}
volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
migratable, err := expc.csiMigratedPluginManager.IsMigratable(volumeSpec)
if err != nil {
@ -285,6 +297,11 @@ func (expc *expandController) syncHandler(key string) error {
}
func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, resizerName string) error {
// if node expand is complete and pv's annotation can be removed, remove the annotation from pv and return
if expc.isNodeExpandComplete(pvc, pv) && metav1.HasAnnotation(pv.ObjectMeta, util.AnnPreResizeCapacity) {
return util.DeleteAnnPreResizeCapacity(pv, expc.GetKubeClient())
}
pvc, err := util.MarkResizeInProgressWithResizer(pvc, resizerName, expc.kubeClient)
if err != nil {
klog.V(5).Infof("Error setting PVC %s in progress with error : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
@ -337,6 +354,13 @@ func (expc *expandController) getPersistentVolume(pvc *v1.PersistentVolumeClaim)
return pv.DeepCopy(), nil
}
// isNodeExpandComplete returns true if pvc.Status.Capacity >= pv.Spec.Capacity
func (expc *expandController) isNodeExpandComplete(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool {
klog.V(4).Infof("pv %q capacity = %v, pvc %s capacity = %v", pv.Name, pv.Spec.Capacity[v1.ResourceStorage], pvc.ObjectMeta.Name, pvc.Status.Capacity[v1.ResourceStorage])
pvcCap, pvCap := pvc.Status.Capacity[v1.ResourceStorage], pv.Spec.Capacity[v1.ResourceStorage]
return pvcCap.Cmp(pvCap) >= 0
}
// Implementing VolumeHost interface
func (expc *expandController) GetPluginDir(pluginName string) string {
return ""

View File

@ -24,6 +24,7 @@ import (
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
@ -40,6 +41,7 @@ import (
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/awsebs"
"k8s.io/kubernetes/pkg/volume/csimigration"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
@ -57,30 +59,42 @@ func TestSyncHandler(t *testing.T) {
}{
{
name: "when pvc has no PV binding",
pvc: getFakePersistentVolumeClaim("no-pv-pvc", "", ""),
pvc: getFakePersistentVolumeClaim("no-pv-pvc", "", "1Gi", "1Gi", ""),
pvcKey: "default/no-pv-pvc",
hasError: true,
},
{
name: "when pvc and pv has everything for in-tree plugin",
pv: getFakePersistentVolume("vol-3", csitranslationplugins.AWSEBSInTreePluginName, "good-pvc-vol-3"),
pvc: getFakePersistentVolumeClaim("good-pvc", "vol-3", "good-pvc-vol-3"),
pv: getFakePersistentVolume("vol-3", csitranslationplugins.AWSEBSInTreePluginName, "1Gi", "good-pvc-vol-3"),
pvc: getFakePersistentVolumeClaim("good-pvc", "vol-3", "1Gi", "2Gi", "good-pvc-vol-3"),
pvcKey: "default/good-pvc",
expansionCalled: true,
expectedAnnotation: map[string]string{volumetypes.VolumeResizerKey: csitranslationplugins.AWSEBSInTreePluginName},
},
{
name: "if pv has pre-resize capacity annotation, generate expand operation should not be called",
pv: func() *v1.PersistentVolume {
pv := getFakePersistentVolume("vol-4", csitranslationplugins.AWSEBSInTreePluginName, "2Gi", "good-pvc-vol-4")
pv.ObjectMeta.Annotations = make(map[string]string)
pv.ObjectMeta.Annotations[util.AnnPreResizeCapacity] = "1Gi"
return pv
}(),
pvc: getFakePersistentVolumeClaim("good-pvc", "vol-4", "2Gi", "2Gi", "good-pvc-vol-4"),
pvcKey: "default/good-pvc",
expansionCalled: false,
},
{
name: "when csi migration is enabled for a in-tree plugin",
csiMigrationEnabled: true,
pv: getFakePersistentVolume("vol-4", csitranslationplugins.AWSEBSInTreePluginName, "csi-pvc-vol-4"),
pvc: getFakePersistentVolumeClaim("csi-pvc", "vol-4", "csi-pvc-vol-4"),
pv: getFakePersistentVolume("vol-4", csitranslationplugins.AWSEBSInTreePluginName, "1Gi", "csi-pvc-vol-5"),
pvc: getFakePersistentVolumeClaim("csi-pvc", "vol-4", "1Gi", "2Gi", "csi-pvc-vol-5"),
pvcKey: "default/csi-pvc",
expectedAnnotation: map[string]string{volumetypes.VolumeResizerKey: csitranslationplugins.AWSEBSDriverName},
},
{
name: "for csi plugin without migration path",
pv: getFakePersistentVolume("vol-5", "com.csi.ceph", "ceph-csi-pvc-vol-5"),
pvc: getFakePersistentVolumeClaim("ceph-csi-pvc", "vol-5", "ceph-csi-pvc-vol-5"),
pv: getFakePersistentVolume("vol-5", "com.csi.ceph", "1Gi", "ceph-csi-pvc-vol-6"),
pvc: getFakePersistentVolumeClaim("ceph-csi-pvc", "vol-5", "1Gi", "2Gi", "ceph-csi-pvc-vol-6"),
pvcKey: "default/ceph-csi-pvc",
expansionCalled: false,
hasError: false,
@ -177,7 +191,7 @@ func applyPVCPatch(originalPVC *v1.PersistentVolumeClaim, patch []byte) (*v1.Per
return updatedPVC, nil
}
func getFakePersistentVolume(volumeName, pluginName string, pvcUID types.UID) *v1.PersistentVolume {
func getFakePersistentVolume(volumeName, pluginName string, size string, pvcUID types.UID) *v1.PersistentVolume {
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{Name: volumeName},
Spec: v1.PersistentVolumeSpec{
@ -185,6 +199,9 @@ func getFakePersistentVolume(volumeName, pluginName string, pvcUID types.UID) *v
ClaimRef: &v1.ObjectReference{
Namespace: "default",
},
Capacity: map[v1.ResourceName]resource.Quantity{
v1.ResourceStorage: resource.MustParse(size),
},
},
}
if pvcUID != "" {
@ -205,10 +222,21 @@ func getFakePersistentVolume(volumeName, pluginName string, pvcUID types.UID) *v
return pv
}
func getFakePersistentVolumeClaim(pvcName, volumeName string, uid types.UID) *v1.PersistentVolumeClaim {
func getFakePersistentVolumeClaim(pvcName, volumeName, statusSize, requestSize string, uid types.UID) *v1.PersistentVolumeClaim {
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{Name: pvcName, Namespace: "default", UID: uid},
Spec: v1.PersistentVolumeClaimSpec{},
Spec: v1.PersistentVolumeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{
v1.ResourceStorage: resource.MustParse(requestSize),
},
},
},
Status: v1.PersistentVolumeClaimStatus{
Capacity: map[v1.ResourceName]resource.Quantity{
v1.ResourceStorage: resource.MustParse(statusSize),
},
},
}
if volumeName != "" {
pvc.Spec.VolumeName = volumeName

View File

@ -26,6 +26,7 @@ import (
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -783,7 +784,20 @@ func (ctrl *PersistentVolumeController) updateClaimStatus(claim *v1.PersistentVo
return nil, fmt.Errorf("PersistentVolume %q is without a storage capacity", volume.Name)
}
claimCap, ok := claim.Status.Capacity[v1.ResourceStorage]
if !ok || volumeCap.Cmp(claimCap) != 0 {
// If PV has a resize annotation, set the claim's request capacity
if metav1.HasAnnotation(volume.ObjectMeta, util.AnnPreResizeCapacity) {
klog.V(2).Infof("volume %q requires filesystem resize: setting pvc %s status capacity to %s", volume.Name, claimToClaimKey(claim), volume.ObjectMeta.Annotations[util.AnnPreResizeCapacity])
preQty, err := resource.ParseQuantity(volume.ObjectMeta.Annotations[util.AnnPreResizeCapacity])
if err != nil {
klog.Warningf("Parsing pre-resize-capacity from PV(%q) failed", volume.Name, err)
preQty = volume.Spec.Capacity[v1.ResourceStorage]
}
if claimClone.Status.Capacity == nil {
claimClone.Status.Capacity = make(map[v1.ResourceName]resource.Quantity)
}
claimClone.Status.Capacity[v1.ResourceStorage] = preQty
dirty = true
} else if !ok || volumeCap.Cmp(claimCap) != 0 {
claimClone.Status.Capacity = volume.Spec.Capacity
dirty = true
}

View File

@ -42,6 +42,7 @@ import (
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume/csimigration"
"k8s.io/kubernetes/pkg/volume/util"
)
var (
@ -89,6 +90,17 @@ func TestControllerSync(t *testing.T) {
return nil
},
},
{
"5-2-3 - complete bind when PV and PVC both exist and PV has AnnPreResizeCapacity annotation",
volumesWithAnnotation(util.AnnPreResizeCapacity, "1Gi", newVolumeArray("volume5-2", "2Gi", "", "", v1.VolumeAvailable, v1.PersistentVolumeReclaimRetain, classEmpty, pvutil.AnnBoundByController)),
volumesWithAnnotation(util.AnnPreResizeCapacity, "1Gi", newVolumeArray("volume5-2", "2Gi", "uid5-2", "claim5-2", v1.VolumeBound, v1.PersistentVolumeReclaimRetain, classEmpty, pvutil.AnnBoundByController)),
withExpectedCapacity("2Gi", newClaimArray("claim5-2", "uid5-2", "2Gi", "", v1.ClaimPending, nil)),
withExpectedCapacity("1Gi", newClaimArray("claim5-2", "uid5-2", "2Gi", "volume5-2", v1.ClaimBound, nil, pvutil.AnnBoundByController, pvutil.AnnBindCompleted)),
noevents, noerrors,
func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
return nil
},
},
{
// deleteClaim with a bound claim makes bound volume released.
"5-3 - delete claim",

View File

@ -1551,6 +1551,14 @@ func (og *operationGenerator) GenerateExpandVolumeFunc(
klog.Warning(detailedErr)
return volumetypes.NewOperationContext(nil, nil, migrated)
}
oldCapacity := pvc.Status.Capacity[v1.ResourceStorage]
err = util.AddAnnPreResizeCapacity(pv, oldCapacity, og.kubeClient)
if err != nil {
detailedErr := fmt.Errorf("error updating pv %s annotation (%s) with pre-resize capacity %s: %v", pv.ObjectMeta.Name, util.AnnPreResizeCapacity, oldCapacity.String(), err)
klog.Warning(detailedErr)
return volumetypes.NewOperationContext(nil, nil, migrated)
}
}
return volumetypes.NewOperationContext(nil, nil, migrated)
}

View File

@ -21,7 +21,7 @@ import (
"encoding/json"
"fmt"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -39,6 +39,12 @@ var (
v1.PersistentVolumeClaimFileSystemResizePending: true,
v1.PersistentVolumeClaimResizing: true,
}
// AnnPreResizeCapacity annotation is added to a PV when expanding volume.
// Its value is status capacity of the PVC prior to the volume expansion
// Its value will be set by the external-resizer when it deems that filesystem resize is required after resizing volume.
// Its value will be used by pv_controller to determine pvc's status capacity when binding pvc and pv.
AnnPreResizeCapacity = "volume.alpha.kubernetes.io/pre-resize-capacity"
)
type resizeProcessStatus struct {
@ -57,27 +63,67 @@ func UpdatePVSize(
newSize resource.Quantity,
kubeClient clientset.Interface) error {
pvClone := pv.DeepCopy()
oldData, err := json.Marshal(pvClone)
if err != nil {
return fmt.Errorf("unexpected error marshaling old PV %q with error : %v", pvClone.Name, err)
}
pvClone.Spec.Capacity[v1.ResourceStorage] = newSize
newData, err := json.Marshal(pvClone)
if err != nil {
return fmt.Errorf("unexpected error marshaling new PV %q with error : %v", pvClone.Name, err)
return PatchPV(pv, pvClone, kubeClient)
}
// AddAnnPreResizeCapacity adds volume.alpha.kubernetes.io/pre-resize-capacity from the pv
func AddAnnPreResizeCapacity(
pv *v1.PersistentVolume,
oldCapacity resource.Quantity,
kubeClient clientset.Interface) error {
// if the pv already has a resize annotation skip the process
if metav1.HasAnnotation(pv.ObjectMeta, AnnPreResizeCapacity) {
return nil
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pvClone)
pvClone := pv.DeepCopy()
if pvClone.ObjectMeta.Annotations == nil {
pvClone.ObjectMeta.Annotations = make(map[string]string)
}
pvClone.ObjectMeta.Annotations[AnnPreResizeCapacity] = oldCapacity.String()
return PatchPV(pv, pvClone, kubeClient)
}
// DeleteAnnPreResizeCapacity deletes volume.alpha.kubernetes.io/pre-resize-capacity from the pv
func DeleteAnnPreResizeCapacity(
pv *v1.PersistentVolume,
kubeClient clientset.Interface) error {
// if the pv does not have a resize annotation skip the entire process
if !metav1.HasAnnotation(pv.ObjectMeta, AnnPreResizeCapacity) {
return nil
}
pvClone := pv.DeepCopy()
delete(pvClone.ObjectMeta.Annotations, AnnPreResizeCapacity)
return PatchPV(pv, pvClone, kubeClient)
}
// PatchPV creates and executes a patch for pv
func PatchPV(
oldPV *v1.PersistentVolume,
newPV *v1.PersistentVolume,
kubeClient clientset.Interface) error {
oldData, err := json.Marshal(oldPV)
if err != nil {
return fmt.Errorf("error Creating two way merge patch for PV %q with error : %v", pvClone.Name, err)
return fmt.Errorf("unexpected error marshaling old PV %q with error : %v", oldPV.Name, err)
}
_, err = kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), pvClone.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
newData, err := json.Marshal(newPV)
if err != nil {
return fmt.Errorf("error Patching PV %q with error : %v", pvClone.Name, err)
return fmt.Errorf("unexpected error marshaling new PV %q with error : %v", newPV.Name, err)
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldPV)
if err != nil {
return fmt.Errorf("error Creating two way merge patch for PV %q with error : %v", oldPV.Name, err)
}
_, err = kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), oldPV.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("error Patching PV %q with error : %v", oldPV.Name, err)
}
return nil
}